CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/maven-org-springframework-ai--spring-ai-starter-mcp-client-webflux

Spring Boot starter providing auto-configuration for Model Context Protocol (MCP) client with Spring WebFlux, enabling reactive AI applications to connect to MCP servers via SSE and Streamable HTTP transports

Overview
Eval results
Files

sse-transport.mddocs/reference/

SSE Transport Auto-Configuration

Auto-configuration for WebFlux-based Server-Sent Events (SSE) client transport in the Model Context Protocol (MCP).

Overview

The SSE transport auto-configuration provides reactive, unidirectional streaming from MCP servers to clients using Server-Sent Events. This is ideal for scenarios where the server needs to push updates, notifications, or streaming data to the client over a persistent connection.

Key Features

  • Unidirectional Streaming: Server to client data flow
  • Persistent Connections: Long-lived connections maintained by WebFlux
  • Reactive: Built on Spring WebClient and Project Reactor
  • Automatic Reconnection: Handles connection failures gracefully
  • WebFlux-Specific: Only available in WebFlux-based applications

Auto-Configuration Class

package org.springframework.ai.mcp.client.webflux.autoconfigure;

/**
 * Auto-configuration for WebFlux-based SSE client transport.
 * Conditionally enabled based on class presence and properties.
 * Creates transport beans for each configured SSE connection.
 * Thread-safe - transports use reactive streams.
 */
@org.springframework.boot.autoconfigure.AutoConfiguration
@org.springframework.boot.autoconfigure.condition.ConditionalOnClass(io.modelcontextprotocol.client.transport.WebFluxSseClientTransport.class)
@org.springframework.boot.context.properties.EnableConfigurationProperties({
    org.springframework.ai.mcp.client.common.autoconfigure.properties.McpSseClientProperties.class,
    org.springframework.ai.mcp.client.common.autoconfigure.properties.McpClientCommonProperties.class
})
@org.springframework.boot.autoconfigure.condition.ConditionalOnProperty(
    prefix = "spring.ai.mcp.client",
    name = "enabled",
    havingValue = "true",
    matchIfMissing = true
)
public class SseWebFluxTransportAutoConfiguration {
}

Conditional Activation

This auto-configuration is conditionally enabled when:

  1. Class Presence: io.modelcontextprotocol.client.transport.WebFluxSseClientTransport is on the classpath
  2. Property Check: spring.ai.mcp.client.enabled is true (default) or not specified
  3. WebFlux Availability: Spring WebFlux dependencies are present

If these conditions are not met, the auto-configuration is skipped.

Required Dependencies

Maven:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>io.modelcontextprotocol</groupId>
    <artifactId>mcp-spring-webflux</artifactId>
</dependency>

Gradle:

implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'io.modelcontextprotocol:mcp-spring-webflux'

Beans Created

1. PropertiesMcpSseClientConnectionDetails

package org.springframework.ai.mcp.client.webflux.autoconfigure;

/**
 * Creates connection details bean from SSE properties.
 * Wraps properties for access via McpSseClientConnectionDetails interface.
 * Enables alternative connection details implementations.
 *
 * @param sseProperties SSE client properties from configuration
 * @return Connection details bean
 */
@org.springframework.context.annotation.Bean
@org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean(McpSseClientConnectionDetails.class)
org.springframework.ai.mcp.client.common.autoconfigure.PropertiesMcpSseClientConnectionDetails mcpSseClientConnectionDetails(
    org.springframework.ai.mcp.client.common.autoconfigure.properties.McpSseClientProperties sseProperties
);

Implementation:

package org.springframework.ai.mcp.client.common.autoconfigure;

/**
 * Default implementation of McpSseClientConnectionDetails.
 * Wraps SSE properties for access by transport creation.
 * Immutable after construction.
 * Thread-safe for reads.
 */
public class PropertiesMcpSseClientConnectionDetails implements McpSseClientConnectionDetails {

    private final org.springframework.ai.mcp.client.common.autoconfigure.properties.McpSseClientProperties properties;

    /**
     * Create connection details from properties.
     *
     * @param properties SSE client properties (never null)
     */
    public PropertiesMcpSseClientConnectionDetails(
        org.springframework.ai.mcp.client.common.autoconfigure.properties.McpSseClientProperties properties
    ) {
        this.properties = properties;
    }

    /**
     * Get configured connections.
     * Returns defensive copy to prevent modification.
     *
     * @return Map of connection names to SSE parameters (never null, may be empty)
     */
    @Override
    public java.util.Map<String, org.springframework.ai.mcp.client.common.autoconfigure.properties.McpSseClientProperties.SseParameters> getConnections() {
        return Map.copyOf(this.properties.getConnections());
    }
}

2. SSE WebFlux Client Transports

package org.springframework.ai.mcp.client.webflux.autoconfigure;

/**
 * Creates a list of WebFlux-based SSE transports for MCP communication.
 *
 * Each transport is configured with:
 * - A cloned WebClient.Builder with server-specific base URL
 * - ObjectMapper for JSON processing
 * - Server connection parameters from properties
 *
 * Process:
 * 1. Get WebClient.Builder (provided or default)
 * 2. Get ObjectMapper (provided or default)
 * 3. For each configured SSE connection:
 *    a. Clone WebClient.Builder
 *    b. Set base URL from connection parameters
 *    c. Create WebFluxSseClientTransport with builder and endpoint
 *    d. Wrap in NamedClientMcpTransport with connection name
 *
 * Thread-safe - uses immutable configurations.
 * Reactive - transports use Spring WebClient.
 *
 * @param connectionDetails SSE client properties containing server configurations
 * @param webClientBuilderProvider Provider for WebClient.Builder (uses WebClient.builder() if not provided)
 * @param objectMapperProvider Provider for ObjectMapper (creates new ObjectMapper() if not provided)
 * @return List of named MCP transports, one for each configured connection (never null, may be empty)
 * @throws IllegalArgumentException if connection configuration is invalid
 */
@org.springframework.context.annotation.Bean
public java.util.List<org.springframework.ai.mcp.client.common.autoconfigure.NamedClientMcpTransport> sseWebFluxClientTransports(
    org.springframework.ai.mcp.client.common.autoconfigure.McpSseClientConnectionDetails connectionDetails,
    org.springframework.beans.factory.ObjectProvider<org.springframework.web.reactive.function.client.WebClient.Builder> webClientBuilderProvider,
    org.springframework.beans.factory.ObjectProvider<com.fasterxml.jackson.databind.ObjectMapper> objectMapperProvider
);

Return Type: List<NamedClientMcpTransport>

Configuration

Basic Configuration

Configure SSE connections in application.yml:

spring.ai.mcp.client.sse:
  connections:
    weather-api:
      url: http://localhost:8080
    news-feed:
      url: http://localhost:9000
      sse-endpoint: /custom/sse

Properties format:

spring.ai.mcp.client.sse.connections.weather-api.url=http://localhost:8080
spring.ai.mcp.client.sse.connections.news-feed.url=http://localhost:9000
spring.ai.mcp.client.sse.connections.news-feed.sse-endpoint=/custom/sse

Configuration Properties Reference

See Configuration Properties for complete SSE configuration options.

Default Values

  • SSE Endpoint: /sse (used if sse-endpoint is not specified)
  • WebClient.Builder: Uses bean from application context, or creates WebClient.builder() if not available
  • ObjectMapper: Uses bean from application context, or creates new ObjectMapper() if not available

Usage Examples

Injecting SSE Transports

import org.springframework.ai.mcp.client.common.autoconfigure.NamedClientMcpTransport;
import org.springframework.stereotype.Service;
import java.util.List;

/**
 * Service demonstrating SSE transport usage.
 * Typically you don't use transports directly - inject McpSyncClient or McpAsyncClient instead.
 * This example shows how to access transports for advanced scenarios.
 */
@Service
public class McpSseService {

    private final List<NamedClientMcpTransport> sseTransports;

    /**
     * Constructor injection of SSE transports.
     * If no SSE connections configured, list will be empty.
     *
     * @param sseTransports Auto-configured SSE transports
     */
    public McpSseService(List<NamedClientMcpTransport> sseTransports) {
        this.sseTransports = sseTransports;
    }

    /**
     * Find specific SSE transport by name.
     * Throws if not found - check existence first in production.
     *
     * @return Weather API transport
     */
    public void connectToWeatherApi() {
        // Find the weather-api transport
        NamedClientMcpTransport weatherTransport = sseTransports.stream()
            .filter(t -> "weather-api".equals(t.name()))
            .findFirst()
            .orElseThrow(() -> new IllegalStateException("weather-api transport not found"));

        // Use the transport (typically via an MCP client)
        io.modelcontextprotocol.spec.McpClientTransport transport = weatherTransport.transport();
        // ... connect and use transport
    }

    /**
     * List all configured SSE connections.
     * Useful for debugging and monitoring.
     */
    public void listAllSseConnections() {
        sseTransports.forEach(t ->
            System.out.println("SSE Transport: " + t.name())
        );
    }
}

Filtering by Transport Type

If your application uses both SSE and Streamable HTTP transports, you can filter by checking the transport implementation type:

import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport;

/**
 * Filter transports by type.
 * Useful when multiple transport types are configured.
 *
 * @param allTransports All configured transports
 */
public void findSseTransports(List<NamedClientMcpTransport> allTransports) {
    List<NamedClientMcpTransport> sseTransports = allTransports.stream()
        .filter(t -> t.transport() instanceof WebFluxSseClientTransport)
        .toList();

    // Use only SSE transports
    sseTransports.forEach(t -> {
        System.out.println("SSE: " + t.name());
    });
}

Custom WebClient Configuration

To customize the WebClient.Builder used by all SSE transports:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import io.netty.channel.ChannelOption;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;

/**
 * Configuration for customizing WebClient used by SSE transports.
 * Applied to all SSE connections.
 */
@Configuration
public class WebClientConfig {

    /**
     * Custom WebClient.Builder with timeouts and headers.
     * Will be cloned for each SSE connection with connection-specific base URL.
     * Thread-safe - builder is cloned, not shared.
     *
     * @return Configured WebClient.Builder
     */
    @Bean
    public WebClient.Builder webClientBuilder() {
        // Configure HTTP client with custom timeouts
        HttpClient httpClient = HttpClient.create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 5 second connect timeout
            .responseTimeout(Duration.ofSeconds(30)); // 30 second response timeout
        
        return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .defaultHeader("User-Agent", "MyApp-MCP-Client/1.0")
            .defaultHeader("Accept", "text/event-stream") // SSE MIME type
            .codecs(configurer -> configurer
                .defaultCodecs()
                .maxInMemorySize(16 * 1024 * 1024)) // 16MB buffer
            .filter((request, next) -> {
                // Add custom logging or filtering
                System.out.println("SSE Request: " + request.url());
                return next.exchange(request);
            });
    }
}

This WebClient.Builder will be cloned and customized for each SSE connection with its specific base URL.

Custom ObjectMapper Configuration

To customize JSON serialization for all SSE transports:

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Configuration for JSON serialization in SSE transports.
 * Applied to all SSE connections.
 */
@Configuration
public class JacksonConfig {

    /**
     * Custom ObjectMapper for SSE JSON processing.
     * Configures date handling, unknown properties, etc.
     * Thread-safe - ObjectMapper is thread-safe after configuration.
     *
     * @return Configured ObjectMapper
     */
    @Bean
    public ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        
        // Register Java 8 date/time module
        mapper.registerModule(new JavaTimeModule());
        
        // Configure serialization
        mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        mapper.configure(SerializationFeature.INDENT_OUTPUT, false); // Compact for SSE
        
        // Configure deserialization
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // Lenient
        mapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
        
        return mapper;
    }
}

Transport Implementation Details

Internally, the auto-configuration creates SSE transports using the MCP SDK's builder pattern:

// Conceptual example (not for direct use) - shows internal process
package org.springframework.ai.mcp.client.webflux.autoconfigure;

// For each configured SSE connection:
for (Map.Entry<String, SseParameters> entry : connections.entrySet()) {
    String connectionName = entry.getKey();
    SseParameters params = entry.getValue();
    
    // Clone WebClient.Builder and set base URL
    var webClientBuilder = webClientBuilderTemplate.clone()
        .baseUrl(params.url());
    
    // Determine SSE endpoint (use default if not specified)
    String sseEndpoint = params.sseEndpoint() != null
        ? params.sseEndpoint()
        : "/sse";
    
    // Create transport
    var transport = io.modelcontextprotocol.client.transport.WebFluxSseClientTransport.builder(webClientBuilder)
        .sseEndpoint(sseEndpoint)
        .jsonMapper(new io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper(objectMapper))
        .build();
    
    // Wrap in named transport
    transports.add(new NamedClientMcpTransport(connectionName, transport));
}

Each configured connection gets its own transport instance, allowing independent configuration and lifecycle management.

SSE Protocol Details

  • MIME Type: text/event-stream
  • Message Format: Each SSE message has event: and data: fields
  • Keepalive: Periodic empty messages to keep connection alive
  • Reconnection: Automatic reconnection on connection loss (configurable)
  • Event Types: MCP defines specific event types for different message types
  • Buffering: Messages buffered according to WebClient codec settings

Connection Details Interface

The connection details abstraction allows for alternative configuration sources:

package org.springframework.ai.mcp.client.common.autoconfigure;

/**
 * Connection details interface for MCP SSE clients.
 * Extends Spring Boot's ConnectionDetails for integration with service connections.
 * Implementations must be thread-safe.
 * Allows alternative configuration sources (not just properties).
 */
public interface McpSseClientConnectionDetails
    extends org.springframework.boot.autoconfigure.service.connection.ConnectionDetails {

    /**
     * Returns the map of configured SSE connections.
     * Map keys are connection names, values are SSE parameters.
     * Returned map should be immutable or unmodifiable.
     *
     * @return Map of SSE connections (never null, may be empty)
     */
    java.util.Map<String, org.springframework.ai.mcp.client.common.autoconfigure.properties.McpSseClientProperties.SseParameters> getConnections();
}

You can provide a custom implementation of this interface to configure connections programmatically:

import org.springframework.ai.mcp.client.common.autoconfigure.McpSseClientConnectionDetails;
import org.springframework.ai.mcp.client.common.autoconfigure.properties.McpSseClientProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;

/**
 * Configuration providing custom SSE connection details.
 * Alternative to properties-based configuration.
 * Useful for dynamic configuration from databases, service discovery, etc.
 */
@Configuration
public class CustomConnectionConfig {

    /**
     * Provide custom SSE connection details.
     * Takes precedence over properties-based configuration.
     * Can load from database, service registry, etc.
     *
     * @return Custom connection details
     */
    @Bean
    public McpSseClientConnectionDetails customSseConnectionDetails() {
        return new McpSseClientConnectionDetails() {
            @Override
            public Map<String, McpSseClientProperties.SseParameters> getConnections() {
                // Load connections from custom source
                // This example shows hardcoded values, but you could query database, etc.
                return Map.of(
                    "dynamic-server",
                    new McpSseClientProperties.SseParameters(
                        discoverServerUrl(), // Dynamic URL discovery
                        "/custom/sse"
                    ),
                    "backup-server",
                    new McpSseClientProperties.SseParameters(
                        "http://backup-host:8080",
                        "/sse"
                    )
                );
            }
            
            private String discoverServerUrl() {
                // Service discovery logic
                return "http://dynamic-host:8080";
            }
        };
    }
}

Troubleshooting

No SSE Transports Created

If no SSE transports are created, check:

  1. Dependency: Ensure io.modelcontextprotocol.sdk:mcp-spring-webflux is on the classpath
  2. Configuration: Verify spring.ai.mcp.client.sse.connections is properly configured
  3. Enabled: Check spring.ai.mcp.client.enabled=true (or not set, as true is default)
  4. WebFlux: Ensure Spring WebFlux dependencies are present
  5. Logs: Enable debug logging: logging.level.org.springframework.ai.mcp=DEBUG

Connection Failures

If SSE connections fail:

  1. URL Validation: Verify the url values are correct and accessible
    • Test with curl: curl -N http://localhost:8080/sse
  2. Endpoint Path: Ensure the sse-endpoint matches the server's expected path
  3. Server Availability: Check that the MCP server is running and responding
  4. Network: Verify no firewalls or proxies are blocking SSE connections
  5. CORS: If cross-origin, check CORS headers allow SSE
  6. Content-Type: Server must send Content-Type: text/event-stream
  7. Timeouts: Check if connection or response timeouts are too short

SSE Stream Interruptions

If SSE streams are interrupted:

  1. Keepalive: Ensure server sends periodic keepalive messages
  2. Proxy Timeout: Check if proxy or load balancer has timeout
  3. Client Timeout: Increase response timeout in WebClient configuration
  4. Network Stability: Check network quality and stability
  5. Reconnection: WebFlux automatically reconnects - check logs for reconnection attempts

Memory Issues

If experiencing memory problems with SSE:

  1. Buffer Size: Reduce maxInMemorySize in WebClient codec configuration
  2. Backpressure: Ensure proper backpressure handling in reactive streams
  3. Message Size: Check if individual SSE messages are very large
  4. Leak Detection: Enable Netty leak detection: -Dio.netty.leakDetection.level=ADVANCED
  5. Monitoring: Monitor heap usage and connection counts

Bean Conflicts

If you see bean definition conflicts:

  1. Ensure you're not manually defining beans with the same names
  2. Use @Primary or @Qualifier annotations to resolve ambiguity
  3. Consider excluding the auto-configuration and configuring manually if needed
  4. Check for multiple WebClient.Builder beans - qualify with @Qualifier if needed

Performance Tuning

Connection Pooling

WebFlux uses Reactor Netty with connection pooling by default:

import reactor.netty.resources.ConnectionProvider;

@Bean
public WebClient.Builder webClientBuilder() {
    ConnectionProvider provider = ConnectionProvider.builder("sse-pool")
        .maxConnections(100)        // Max total connections
        .maxIdleTime(Duration.ofSeconds(30)) // Idle timeout
        .maxLifeTime(Duration.ofMinutes(5))  // Max connection lifetime
        .pendingAcquireTimeout(Duration.ofSeconds(60)) // Wait timeout
        .evictInBackground(Duration.ofSeconds(120)) // Cleanup interval
        .build();
    
    HttpClient httpClient = HttpClient.create(provider);
    
    return WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(httpClient));
}

Backpressure Configuration

Configure backpressure for SSE streams:

@Bean
public WebClient.Builder webClientBuilder() {
    return WebClient.builder()
        .codecs(configurer -> {
            configurer.defaultCodecs().enableLoggingRequestDetails(true);
            configurer.defaultCodecs().maxInMemorySize(256 * 1024); // 256KB buffer
        });
}

Netty Optimization

Optimize Netty for high-performance SSE:

@Bean
public WebClient.Builder webClientBuilder() {
    HttpClient httpClient = HttpClient.create()
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
        .responseTimeout(Duration.ofMinutes(5)) // Long timeout for SSE
        .compress(true); // Enable compression
    
    return WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(httpClient));
}

Security Considerations

TLS/SSL Configuration

For HTTPS SSE connections:

import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

@Bean
public WebClient.Builder secureWebClientBuilder() throws Exception {
    SslContext sslContext = SslContextBuilder
        .forClient()
        // For production, use proper trust store:
        // .trustManager(trustManagerFactory)
        // For development/testing only:
        .trustManager(InsecureTrustManagerFactory.INSTANCE)
        .build();
    
    HttpClient httpClient = HttpClient.create()
        .secure(sslSpec -> sslSpec.sslContext(sslContext));
    
    return WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(httpClient));
}

Authentication

Add authentication headers:

@Bean
public WebClient.Builder authenticatedWebClientBuilder() {
    return WebClient.builder()
        .defaultHeader("Authorization", "Bearer " + getToken())
        .filter((request, next) -> {
            // Refresh token if needed
            String token = getToken();
            ClientRequest authenticated = ClientRequest.from(request)
                .header("Authorization", "Bearer " + token)
                .build();
            return next.exchange(authenticated);
        });
}

Related Documentation

tessl i tessl/maven-org-springframework-ai--spring-ai-starter-mcp-client-webflux@1.1.0

docs

index.md

tile.json