The ClickHouse Testcontainers module provides R2DBC (Reactive Relational Database Connectivity) support for non-blocking, reactive database access. This enables integration with reactive frameworks like Spring WebFlux and Project Reactor.
Note: R2DBC support requires the org.testcontainers:r2dbc module and the ClickHouse R2DBC driver (com.clickhouse:clickhouse-r2dbc) as dependencies.
Required Dependencies:
org.testcontainers:clickhouse (this package)org.testcontainers:r2dbc (R2DBC module)com.clickhouse:clickhouse-r2dbc (ClickHouse R2DBC driver)Core Capabilities:
ClickHouseContainerKey Interfaces and Classes:
ClickHouseR2DBCDatabaseContainer - R2DBC wrapper: new ClickHouseR2DBCDatabaseContainer(ClickHouseContainer), getOptions(ClickHouseContainer), configure(ConnectionFactoryOptions), start(), stop()ClickHouseR2DBCDatabaseContainerProvider - R2DBC provider: supports(ConnectionFactoryOptions), createContainer(ConnectionFactoryOptions), getMetadata(ConnectionFactoryOptions)Default R2DBC Configuration:
clickhousehttp (uses ClickHouse HTTP interface)getHost()Threading Model:
start(), stop()) are blockingLifecycle:
start() and stop() delegate to wrapped containerExceptions:
IllegalArgumentException: Invalid parameters (null container, null options)IllegalStateException: Container not started when accessing port/hostClassNotFoundException: R2DBC driver not found on classpathContainerLaunchException: Container fails to startEdge Cases:
IllegalArgumentExceptionClassNotFoundException when creating connection factoryIllegalStateException when accessing port/hostIllegalArgumentException or use wrong providerThe ClickHouseR2DBCDatabaseContainer class wraps a standard ClickHouseContainer and provides R2DBC-specific functionality.
/**
* Wrap an existing ClickHouseContainer for R2DBC usage
* The container must be started before using R2DBC operations
* @param container ClickHouse container instance to wrap (cannot be null)
* @throws IllegalArgumentException if container is null
*/
public ClickHouseR2DBCDatabaseContainer(ClickHouseContainer container);Usage Examples:
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;
// Create base container
ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
container.start();
// Wrap for R2DBC usage
ClickHouseR2DBCDatabaseContainer r2dbcContainer = new ClickHouseR2DBCDatabaseContainer(container);Edge Cases:
IllegalArgumentExceptionGet R2DBC connection factory options configured for the container. Options are immutable and can be used to create connection factories.
/**
* Get R2DBC ConnectionFactoryOptions for the specified ClickHouse container
* Configures host, port (8123), database, user, password, and protocol (http)
* This is a static convenience method that does not require wrapper instantiation
* @param container ClickHouse container instance (cannot be null, must be started)
* @return Configured ConnectionFactoryOptions with all required options set
* @throws IllegalArgumentException if container is null
* @throws IllegalStateException if container is not started
*/
public static ConnectionFactoryOptions getOptions(ClickHouseContainer container);
/**
* Configure ConnectionFactoryOptions with container details
* Sets host, port (8123), database name, username, password, and protocol (http)
* Merges with existing options, container options take precedence
* @param options Base ConnectionFactoryOptions to configure (cannot be null)
* @return Configured ConnectionFactoryOptions with container details merged
* @throws IllegalArgumentException if options is null
* @throws IllegalStateException if wrapped container is not started
*/
public ConnectionFactoryOptions configure(ConnectionFactoryOptions options);Usage Examples:
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import reactor.core.publisher.Mono;
// Create and start container
ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
container.start();
// Get R2DBC connection options (static method)
ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(container);
// Create connection factory
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
// Use connection factory with reactive streams
Mono.from(connectionFactory.create())
.flatMapMany(connection ->
Mono.from(connection.createStatement("SELECT 1").execute())
.flatMapMany(result -> result.map((row, metadata) -> row.get(0, Integer.class)))
)
.subscribe(value -> System.out.println("Result: " + value));Option Configuration Details:
DRIVER: Set to "clickhouse"PROTOCOL: Set to "http"HOST: Set to container host (from container.getHost())PORT: Set to HTTP port (8123, mapped port from container.getMappedPort(8123))DATABASE: Set to database name (from container.getDatabaseName())USER: Set to username (from container.getUsername())PASSWORD: Set to password (from container.getPassword())Edge Cases:
IllegalArgumentExceptionIllegalStateException when accessing port/hostManage the wrapped container's lifecycle. Lifecycle methods delegate to the wrapped container.
/**
* Start the wrapped ClickHouse container
* Blocks until container is healthy or timeout occurs
* @throws ContainerLaunchException if container fails to start
* @throws TimeoutException if startup timeout is exceeded
* @throws IllegalStateException if container is already started
*/
public void start();
/**
* Stop the wrapped ClickHouse container
* Stops and removes the container
* Safe to call multiple times (idempotent)
* @throws IllegalStateException if container was never started
*/
public void stop();Usage Examples:
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;
ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
ClickHouseR2DBCDatabaseContainer r2dbcContainer = new ClickHouseR2DBCDatabaseContainer(container);
// Start through R2DBC wrapper
r2dbcContainer.start();
// Use container...
// Stop through R2DBC wrapper
r2dbcContainer.stop();Lifecycle Notes:
start() and stop() delegate to wrapped containerIllegalStateExceptionIllegalStateExceptionThe ClickHouseR2DBCDatabaseContainerProvider class enables automatic container creation from R2DBC connection URLs. The provider is automatically discovered via Java's ServiceLoader mechanism.
/**
* Check if this provider supports the given R2DBC connection options
* Checks if the driver option matches "clickhouse"
* @param options ConnectionFactoryOptions to check (cannot be null)
* @return true if the driver option matches ClickHouse R2DBC driver, false otherwise
* @throws IllegalArgumentException if options is null
*/
public boolean supports(ConnectionFactoryOptions options);
/**
* Create a new R2DBC container from connection options
* Parses URL parameters (TC_IMAGE_TAG, TC_REUSABLE, etc.) from options
* @param options ConnectionFactoryOptions containing configuration (cannot be null)
* @return New R2DBCDatabaseContainer instance (ClickHouseR2DBCDatabaseContainer)
* @throws IllegalArgumentException if options is null or invalid
* @throws ContainerLaunchException if container fails to start
*/
public R2DBCDatabaseContainer createContainer(ConnectionFactoryOptions options);
/**
* Get connection metadata for the given options
* Automatically adds default user and password if not specified
* @param options ConnectionFactoryOptions to process (cannot be null)
* @return ConnectionFactoryMetadata with defaults applied
* @throws IllegalArgumentException if options is null
*/
public ConnectionFactoryMetadata getMetadata(ConnectionFactoryOptions options);r2dbc:tc:clickhouse:///[database]?[parameters]Components:
r2dbc:tc: - Testcontainers R2DBC URL prefix (required)clickhouse - Database type identifier (required, case-sensitive)database - Database name (optional, defaults to "default")parameters - Optional URL parametersCommon Parameters:
TC_IMAGE_TAG - Docker image tag to use (e.g., ?TC_IMAGE_TAG=24.12-alpine)TC_REUSABLE - Enable container reuse (e.g., ?TC_REUSABLE=true)TC_INITSCRIPT - Path to initialization script (e.g., ?TC_INITSCRIPT=init.sql)URL Parsing:
&Basic R2DBC URL usage:
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ClickHouseR2DBCUrlTest {
public void testWithR2dbcUrl() {
// Create connection factory from Testcontainers R2DBC URL
// Container is automatically created and started
String r2dbcUrl = "r2dbc:tc:clickhouse:///testdb?TC_IMAGE_TAG=24.12-alpine";
ConnectionFactory connectionFactory = ConnectionFactories.get(r2dbcUrl);
// Use reactive streams for database operations
Mono.from(connectionFactory.create())
.flatMapMany(connection ->
Mono.from(connection.createStatement("SELECT version()").execute())
.flatMapMany(result -> result.map((row, metadata) ->
row.get(0, String.class)))
.doFinally(signalType -> Mono.from(connection.close()).subscribe())
)
.subscribe(
version -> System.out.println("ClickHouse version: " + version),
error -> System.err.println("Error: " + error),
() -> System.out.println("Complete")
);
}
}R2DBC URL with multiple parameters:
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
public class ClickHouseR2DBCMultipleParamsTest {
public void testWithMultipleParams() {
String r2dbcUrl = "r2dbc:tc:clickhouse:///mydb?" +
"TC_IMAGE_TAG=24.12-alpine&" +
"TC_REUSABLE=true&" +
"TC_INITSCRIPT=schema.sql";
ConnectionFactory factory = ConnectionFactories.get(r2dbcUrl);
// Use factory...
}
}import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ClickHouseR2DBCExample {
public void basicR2dbcUsage() {
// Create and configure container
ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine")
.withDatabaseName("testdb")
.withUsername("user")
.withPassword("pass");
container.start();
try {
// Get R2DBC connection options
ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(container);
// Create connection factory
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
// Perform reactive database operations
Mono.from(connectionFactory.create())
.flatMapMany(connection -> {
// Create table
Mono<Void> createTable = Mono.from(
connection.createStatement(
"CREATE TABLE IF NOT EXISTS users (" +
" id UInt32, " +
" name String" +
") ENGINE = MergeTree() ORDER BY id"
).execute()
).then();
// Insert data
Mono<Void> insertData = Mono.from(
connection.createStatement(
"INSERT INTO users (id, name) VALUES (?, ?)"
)
.bind(0, 1)
.bind(1, "Alice")
.execute()
).then();
// Query data
Flux<String> queryData = Mono.from(
connection.createStatement("SELECT name FROM users WHERE id = ?")
.bind(0, 1)
.execute()
)
.flatMapMany(result -> result.map((row, metadata) ->
row.get("name", String.class)));
// Chain operations
return createTable
.then(insertData)
.thenMany(queryData)
.doFinally(signalType -> Mono.from(connection.close()).subscribe());
})
.subscribe(
name -> System.out.println("User: " + name),
error -> System.err.println("Error: " + error),
() -> System.out.println("Complete")
);
// Wait for async operations (in test, use proper waiting mechanism)
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
container.stop();
}
}
}import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import reactor.core.publisher.Mono;
public class ClickHouseR2DBCUrlExample {
public void urlBasedR2dbcUsage() {
// Use Testcontainers R2DBC URL for automatic container management
String r2dbcUrl = "r2dbc:tc:clickhouse:///mydb?" +
"TC_IMAGE_TAG=24.12-alpine&" +
"TC_REUSABLE=true";
ConnectionFactory factory = ConnectionFactories.get(r2dbcUrl);
// Execute query using reactive streams
Mono.from(factory.create())
.flatMapMany(connection ->
Mono.from(connection.createStatement("SELECT 1 as result").execute())
.flatMapMany(result -> result.map((row, metadata) ->
row.get("result", Integer.class)))
.doFinally(signalType -> Mono.from(connection.close()).subscribe())
)
.subscribe(
value -> System.out.println("Result: " + value),
error -> System.err.println("Error: " + error),
() -> System.out.println("Complete")
);
}
}import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
@Configuration
@EnableR2dbcRepositories
public class R2dbcConfig extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
// Use Testcontainers R2DBC URL for integration tests
String r2dbcUrl = "r2dbc:tc:clickhouse:///testdb?" +
"TC_IMAGE_TAG=24.12-alpine&" +
"TC_REUSABLE=true";
return ConnectionFactories.get(r2dbcUrl);
}
}Spring Boot Test with Dynamic Properties:
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
@SpringBootTest
@Testcontainers
public class SpringWebFluxIntegrationTest {
@Container
static ClickHouseContainer clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(clickhouse);
registry.add("spring.r2dbc.url", () -> "r2dbc:clickhouse://" +
options.getRequiredValue(ConnectionFactoryOptions.HOST) + ":" +
options.getRequiredValue(ConnectionFactoryOptions.PORT) + "/" +
options.getRequiredValue(ConnectionFactoryOptions.DATABASE));
registry.add("spring.r2dbc.username", () ->
options.getRequiredValue(ConnectionFactoryOptions.USER));
registry.add("spring.r2dbc.password", () ->
options.getRequiredValue(ConnectionFactoryOptions.PASSWORD));
}
// Tests use Spring-managed R2DBC ConnectionFactory
}import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
public class ReactorExample {
public void reactiveStreamProcessing() {
ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
container.start();
try {
ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(container);
ConnectionFactory factory = ConnectionFactories.get(options);
// Process data reactively with backpressure
Flux.usingWhen(
Mono.from(factory.create()), // Create connection
connection -> {
// Create table and insert test data
return Mono.from(
connection.createStatement(
"CREATE TABLE IF NOT EXISTS numbers (value UInt32) ENGINE = Memory"
).execute()
)
.then(Mono.from(
connection.createStatement(
"INSERT INTO numbers SELECT number FROM numbers(100)"
).execute()
))
.thenMany(
// Query data as stream
Mono.from(
connection.createStatement("SELECT value FROM numbers").execute()
)
.flatMapMany(result ->
result.map((row, metadata) -> row.get("value", Integer.class))
)
);
},
connection -> Mono.from(connection.close()) // Close connection
)
.filter(value -> value % 2 == 0) // Filter even numbers
.map(value -> value * 2) // Transform
.buffer(10) // Batch processing
.delayElements(Duration.ofMillis(100)) // Rate limiting
.subscribe(
batch -> System.out.println("Processed batch: " + batch),
error -> System.err.println("Error: " + error),
() -> System.out.println("Processing complete")
);
// Keep application running for async operations
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
container.stop();
}
}
}import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
public class R2DBCErrorHandlingExample {
public void handleErrors() {
String r2dbcUrl = "r2dbc:tc:clickhouse:///testdb?TC_IMAGE_TAG=24.12-alpine";
ConnectionFactory factory = ConnectionFactories.get(r2dbcUrl);
Mono.from(factory.create())
.flatMapMany(connection ->
Mono.from(connection.createStatement("SELECT * FROM nonexistent_table").execute())
.flatMapMany(result -> result.map((row, metadata) -> row))
)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
.onErrorResume(error -> {
System.err.println("Error after retries: " + error.getMessage());
return Mono.empty();
})
.doFinally(signalType -> {
// Cleanup
})
.subscribe();
}
}The R2DBC support automatically configures:
clickhouse)http (uses ClickHouse HTTP interface)getHost() (typically "localhost")Connection Factory Options:
To use R2DBC support, add the following dependencies:
Maven:
<!-- Testcontainers R2DBC module -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>r2dbc</artifactId>
<version>1.21.4</version>
<scope>test</scope>
</dependency>
<!-- ClickHouse R2DBC driver -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-r2dbc</artifactId>
<version>0.7.2</version>
<classifier>http</classifier>
<scope>test</scope>
</dependency>
<!-- Project Reactor (usually provided transitively) -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.0</version>
<scope>test</scope>
</dependency>Gradle:
testImplementation "org.testcontainers:r2dbc:1.21.4"
testImplementation "com.clickhouse:clickhouse-r2dbc:0.7.2:http"
testImplementation "io.projectreactor:reactor-core:3.5.0"Missing Dependencies:
// If R2DBC module is not on classpath
ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(container);
// No error at this point, error occurs when creating connection factory
ConnectionFactory factory = ConnectionFactories.get(options);
// Throws ClassNotFoundException if R2DBC driver not foundContainer Not Started:
ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
// Container not started
ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(container);
// Throws IllegalStateException when accessing port/hostInvalid R2DBC URL:
// Missing database type
String url = "r2dbc:tc:///testdb";
ConnectionFactory factory = ConnectionFactories.get(url);
// Throws IllegalArgumentException or uses wrong providerConnection Failures:
// Container stops while connection is active
Mono.from(factory.create())
.flatMapMany(connection -> {
container.stop(); // Stop container
return Mono.from(connection.createStatement("SELECT 1").execute())
.flatMapMany(result -> result.map((row, metadata) -> row));
})
.subscribe(
value -> {},
error -> {
// Error: connection lost, container stopped
System.err.println("Connection error: " + error.getMessage());
}
);Reactive Stream Errors:
// Errors in reactive streams must be handled
Mono.from(factory.create())
.flatMapMany(connection ->
Mono.from(connection.createStatement("SELECT 1").execute())
.flatMapMany(result -> result.map((row, metadata) -> row))
)
.subscribe(
value -> System.out.println("Value: " + value),
error -> System.err.println("Error: " + error), // Must handle errors
() -> System.out.println("Complete")
);Connection Not Closed:
// Connections must be closed explicitly
Mono.from(factory.create())
.flatMapMany(connection -> {
// Use connection...
return Flux.just("data");
// Connection not closed - resource leak!
})
.subscribe();
// Correct: Close connection
Mono.from(factory.create())
.flatMapMany(connection ->
Flux.just("data")
.doFinally(signalType -> Mono.from(connection.close()).subscribe())
)
.subscribe();