The Testcontainers PostgreSQL module provides full support for R2DBC (Reactive Relational Database Connectivity), enabling non-blocking, reactive access to PostgreSQL databases in tests. This is essential for testing reactive applications built with Project Reactor, Spring WebFlux, or other reactive frameworks.
Wrapper class that adapts a PostgreSQLContainer for use with R2DBC connection factories.
/**
* R2DBC database container implementation for PostgreSQL
* Wraps a PostgreSQLContainer to provide R2DBC connectivity
*/
public class PostgreSQLR2DBCDatabaseContainer implements R2DBCDatabaseContainer {
/**
* Create an R2DBC container wrapper around an existing PostgreSQL container
* @param container The PostgreSQL container to wrap (must not be null)
* @throws IllegalArgumentException if container is null
*/
public PostgreSQLR2DBCDatabaseContainer(PostgreSQLContainer<?> container);
/**
* Get R2DBC ConnectionFactoryOptions from a PostgreSQL container
* Converts JDBC connection details to R2DBC format
* @param container PostgreSQL container instance (must not be null)
* @return ConnectionFactoryOptions configured for R2DBC
* @throws IllegalArgumentException if container is null
* @throws IllegalStateException if container is not started
*/
public static ConnectionFactoryOptions getOptions(PostgreSQLContainer<?> container);
/**
* Configure additional R2DBC connection factory options
* @param options Base ConnectionFactoryOptions to configure (must not be null)
* @return Configured ConnectionFactoryOptions with PostgreSQL settings
* @throws IllegalArgumentException if options is null
*/
public ConnectionFactoryOptions configure(ConnectionFactoryOptions options);
/**
* Start the underlying PostgreSQL container
* Delegates to the wrapped PostgreSQLContainer
* @throws ContainerLaunchException if container fails to start
*/
public void start();
/**
* Stop the underlying PostgreSQL container
* Delegates to the wrapped PostgreSQLContainer
*/
public void stop();
}Factory class for creating R2DBC containers from R2DBC connection URLs. Registered via Java SPI.
/**
* Provider for creating PostgreSQL R2DBC containers from connection options
* Registered via Java SPI (Service Provider Interface)
*/
public class PostgreSQLR2DBCDatabaseContainerProvider
implements R2DBCDatabaseContainerProvider {
/**
* Check if this provider supports the given R2DBC connection options
* Returns true if the options specify "postgresql" as the driver
* @param options R2DBC ConnectionFactoryOptions (must not be null)
* @return true if driver is "postgresql", false otherwise
* @throws IllegalArgumentException if options is null
*/
public boolean supports(ConnectionFactoryOptions options);
/**
* Create a new R2DBC container from connection factory options
* Extracts database configuration from options and creates container
* @param options R2DBC ConnectionFactoryOptions (must not be null)
* @return Configured R2DBCDatabaseContainer
* @throws IllegalArgumentException if options is null or invalid
* @throws ContainerLaunchException if container fails to start
*/
public R2DBCDatabaseContainer createContainer(ConnectionFactoryOptions options);
/**
* Get metadata about the connection factory
* @param options R2DBC ConnectionFactoryOptions (must not be null)
* @return Connection metadata
* @throws IllegalArgumentException if options is null
*/
public ConnectionFactoryMetadata getMetadata(ConnectionFactoryOptions options);
}import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionFactories;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class R2DBCTest {
public void testWithR2DBC() {
// Create standard PostgreSQL container
PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15");
postgres.start();
// Get R2DBC connection options
ConnectionFactoryOptions options =
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres);
// Create R2DBC connection factory
ConnectionFactory connectionFactory =
ConnectionFactories.get(options);
// Use reactive API
Mono.from(connectionFactory.create())
.flatMapMany(connection ->
Flux.from(connection.createStatement("SELECT 1")
.execute())
.flatMap(result ->
result.map((row, metadata) ->
row.get(0, Integer.class)))
.doFinally(signalType -> connection.close())
)
.subscribe(value -> {
// value is 1
});
postgres.stop();
}
}import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionFactories;
public class R2DBCWrapperTest {
public void testWithWrapper() {
// Create PostgreSQL container
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15")) {
postgres.start();
// Wrap in R2DBC container
PostgreSQLR2DBCDatabaseContainer r2dbcContainer =
new PostgreSQLR2DBCDatabaseContainer(postgres);
// Get connection options
ConnectionFactoryOptions options =
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres);
// Create connection factory
ConnectionFactory connectionFactory =
ConnectionFactories.get(options);
// Use reactive queries
// ...
}
}
}The R2DBC provider supports automatic container creation from R2DBC URLs using the r2dbc:tc: scheme.
R2DBC URL Format:
r2dbc:tc:postgresql:///<database>?TC_IMAGE_TAG=<version>Usage Example:
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.Option;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class R2DBCUrlTest {
public void testWithR2dbcUrl() {
// Container automatically managed via URL
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.option(ConnectionFactoryOptions.DRIVER, "tc")
.option(ConnectionFactoryOptions.PROTOCOL, "postgresql")
.option(ConnectionFactoryOptions.DATABASE, "testdb")
.option(Option.valueOf("TC_IMAGE_TAG"), "15")
.build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
// Or using URL string
String r2dbcUrl = "r2dbc:tc:postgresql:///testdb?TC_IMAGE_TAG=15";
ConnectionFactory factory = ConnectionFactories.get(r2dbcUrl);
// Use reactive API
Mono.from(factory.create())
.flatMapMany(connection ->
Flux.from(connection.createStatement("SELECT version()")
.execute())
.flatMap(result ->
result.map((row, metadata) ->
row.get(0, String.class)))
.doFinally(signalType -> connection.close())
)
.subscribe(version -> {
// PostgreSQL version string
});
}
}import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import reactor.core.publisher.Mono;
import java.time.Duration;
public class R2DBCPoolTest {
public void testWithConnectionPool() {
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15")) {
postgres.start();
// Get R2DBC options
ConnectionFactoryOptions options =
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres);
// Create base connection factory
ConnectionFactory connectionFactory =
ConnectionFactories.get(options);
// Configure connection pool
ConnectionPoolConfiguration poolConfig = ConnectionPoolConfiguration.builder()
.connectionFactory(connectionFactory)
.maxIdleTime(Duration.ofMinutes(30))
.maxSize(20)
.maxCreateConnectionTime(Duration.ofSeconds(10))
.build();
// Create pooled connection factory
ConnectionPool pool = new ConnectionPool(poolConfig);
// Use pooled connections
Mono.from(pool.create())
.flatMap(connection ->
Mono.from(connection.createStatement("SELECT 1")
.execute())
.flatMap(result ->
Mono.from(result.map((row, metadata) ->
row.get(0, Integer.class))))
.doFinally(signalType -> connection.close())
)
.block();
// Close pool
pool.dispose();
}
}
}import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
@SpringBootTest
@EnableR2dbcRepositories
public class SpringDataR2DBCTest {
@TestConfiguration
static class TestConfig extends AbstractR2dbcConfiguration {
@Bean(initMethod = "start", destroyMethod = "stop")
public PostgreSQLContainer<?> postgresContainer() {
return new PostgreSQLContainer<>("postgres:15")
.withDatabaseName("testdb")
.withUsername("testuser")
.withPassword("testpass");
}
@Bean
@Override
public ConnectionFactory connectionFactory() {
PostgreSQLContainer<?> container = postgresContainer();
ConnectionFactoryOptions options =
PostgreSQLR2DBCDatabaseContainer.getOptions(container);
return ConnectionFactories.get(options);
}
}
@Autowired
private R2dbcEntityTemplate template;
@Test
void testReactiveQuery() {
Flux<User> users = template
.select(User.class)
.all();
StepVerifier.create(users)
.expectNextCount(0)
.verifyComplete();
}
}import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.Option;
import static io.r2dbc.spi.ConnectionFactoryOptions.*;
public class R2DBCCustomConfigTest {
public void testCustomConfiguration() {
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15")
.withDatabaseName("mydb")
.withUsername("myuser")
.withPassword("mypass")) {
postgres.start();
// Build custom R2DBC options
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.option(DRIVER, "postgresql")
.option(HOST, postgres.getHost())
.option(PORT, postgres.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT))
.option(DATABASE, postgres.getDatabaseName())
.option(USER, postgres.getUsername())
.option(PASSWORD, postgres.getPassword())
.option(Option.valueOf("sslMode"), "disable")
.option(Option.valueOf("schema"), "public")
.option(Option.valueOf("applicationName"), "TestApp")
.build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
// Use connection factory
// ...
}
}
}import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
public class R2DBCCrudTest {
public void testCrudOperations() {
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15")
.withInitScript("schema.sql")) {
postgres.start();
ConnectionFactory connectionFactory = ConnectionFactories.get(
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres)
);
// Create table
Mono<Void> createTable = Mono.from(connectionFactory.create())
.flatMap(connection ->
Mono.from(connection.createStatement(
"CREATE TABLE IF NOT EXISTS users (" +
"id SERIAL PRIMARY KEY, " +
"username VARCHAR(50), " +
"email VARCHAR(100))"
).execute())
.then(Mono.from(connection.close()))
);
// Insert data
Mono<Integer> insert = Mono.from(connectionFactory.create())
.flatMap(connection ->
Mono.from(connection.createStatement(
"INSERT INTO users (username, email) VALUES ($1, $2)"
)
.bind("$1", "alice")
.bind("$2", "alice@example.com")
.returnGeneratedValues("id")
.execute())
.flatMap(result ->
Mono.from(result.map((row, metadata) ->
row.get("id", Integer.class))))
.doFinally(signalType -> connection.close())
);
// Query data
Flux<String> query = Mono.from(connectionFactory.create())
.flatMapMany(connection ->
Flux.from(connection.createStatement(
"SELECT username FROM users WHERE email = $1"
)
.bind("$1", "alice@example.com")
.execute())
.flatMap(result ->
result.map((row, metadata) ->
row.get("username", String.class)))
.doFinally(signalType -> connection.close())
);
// Execute operations
StepVerifier.create(createTable.then(insert))
.expectNext(1)
.verifyComplete();
StepVerifier.create(query)
.expectNext("alice")
.verifyComplete();
}
}
}When using PostgreSQLR2DBCDatabaseContainer.getOptions(), the following options are automatically configured:
DRIVER: Set to "postgresql"HOST: Container host addressPORT: Mapped PostgreSQL portDATABASE: Configured database nameUSER: Configured usernamePASSWORD: Configured passwordAdditional options can be added by building on top of the base options:
ConnectionFactoryOptions baseOptions =
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres);
ConnectionFactoryOptions customOptions = ConnectionFactoryOptions.builder()
.from(baseOptions)
.option(Option.valueOf("sslMode"), "require")
.option(Option.valueOf("connectTimeout"), Duration.ofSeconds(30))
.option(Option.valueOf("schema"), "myschema")
.build();The PostgreSQLR2DBCDatabaseContainerProvider is automatically registered via Java SPI. The registration is defined in:
META-INF/services/org.testcontainers.r2dbc.R2DBCDatabaseContainerProviderThis enables automatic detection when using R2DBC URLs with the r2dbc:tc:postgresql: scheme.
To use R2DBC support, include these dependencies:
Maven:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.21.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>r2dbc</artifactId>
<version>1.21.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>1.0.2.RELEASE</version>
<scope>test</scope>
</dependency>Gradle:
testImplementation 'org.testcontainers:postgresql:1.21.4'
testImplementation 'org.testcontainers:r2dbc:1.21.4'
testImplementation 'org.postgresql:r2dbc-postgresql:1.0.2.RELEASE'Always ensure connections are properly closed:
Mono.from(connectionFactory.create())
.flatMap(connection ->
// Use connection
Mono.from(connection.createStatement("SELECT 1").execute())
.flatMap(result -> Mono.from(result.getRowsUpdated()))
.doFinally(signalType -> connection.close()) // Always close
)
.subscribe();Handle R2DBC errors appropriately:
import io.r2dbc.spi.R2dbcException;
import org.springframework.r2dbc.BadSqlGrammarException;
Mono.from(connectionFactory.create())
.flatMap(connection ->
Flux.from(connection.createStatement("SELECT * FROM users").execute())
.flatMap(result ->
result.map((row, metadata) -> row.get("username", String.class)))
.onErrorResume(R2dbcException.class, e -> {
// Handle R2DBC errors
System.err.println("R2DBC error: " + e.getMessage());
return Flux.empty();
})
.doFinally(signalType -> connection.close())
)
.subscribe();Use Project Reactor's StepVerifier for testing:
import reactor.test.StepVerifier;
Flux<String> query = Mono.from(connectionFactory.create())
.flatMapMany(connection ->
Flux.from(connection.createStatement("SELECT username FROM users").execute())
.flatMap(result ->
result.map((row, metadata) -> row.get("username", String.class)))
.doFinally(signalType -> connection.close())
);
StepVerifier.create(query)
.expectNext("alice", "bob")
.verifyComplete();Handling connection errors:
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15")) {
postgres.start();
ConnectionFactoryOptions options =
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres);
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
Mono.from(connectionFactory.create())
.flatMap(connection ->
Mono.from(connection.createStatement("SELECT 1").execute())
.doOnError(error -> {
System.err.println("R2DBC connection error: " + error.getMessage());
System.err.println("Container logs: " + postgres.getLogs());
})
.doFinally(signalType -> connection.close())
)
.onErrorResume(error -> {
// Handle connection errors
if (error.getMessage().contains("Connection refused")) {
System.err.println("Container may not be ready yet");
}
return Mono.error(error);
})
.block();
}Configuring timeouts:
import java.time.Duration;
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15")
.withStartupTimeoutSeconds(180)) {
postgres.start();
ConnectionFactoryOptions baseOptions =
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres);
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.from(baseOptions)
.option(Option.valueOf("connectTimeout"), Duration.ofSeconds(30))
.build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
// Use with timeout
Mono.from(connectionFactory.create())
.timeout(Duration.ofSeconds(10))
.flatMap(connection ->
Mono.from(connection.createStatement("SELECT 1").execute())
.doFinally(signalType -> connection.close())
)
.block();
}Ensuring proper cleanup:
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15")) {
postgres.start();
ConnectionFactory connectionFactory = ConnectionFactories.get(
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres)
);
// Always close connections
Mono.from(connectionFactory.create())
.flatMap(connection ->
Flux.from(connection.createStatement("SELECT 1").execute())
.flatMap(result ->
result.map((row, metadata) -> row.get(0, Integer.class)))
.doFinally(signalType -> {
// Ensure connection is closed
connection.close().subscribe();
})
)
.blockLast();
}Retry logic:
import reactor.util.retry.Retry;
import io.r2dbc.spi.R2dbcException;
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15")) {
postgres.start();
ConnectionFactory connectionFactory = ConnectionFactories.get(
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres)
);
Mono.from(connectionFactory.create())
.flatMap(connection ->
Flux.from(connection.createStatement("SELECT 1").execute())
.flatMap(result ->
result.map((row, metadata) -> row.get(0, Integer.class)))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(error -> error instanceof R2dbcException))
.doFinally(signalType -> connection.close())
)
.blockLast();
}Handling pool exhaustion:
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15")) {
postgres.start();
ConnectionFactoryOptions options =
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres);
ConnectionFactory baseFactory = ConnectionFactories.get(options);
ConnectionPoolConfiguration poolConfig = ConnectionPoolConfiguration.builder()
.connectionFactory(baseFactory)
.maxSize(5) // Small pool for testing
.maxIdleTime(Duration.ofMinutes(10))
.maxCreateConnectionTime(Duration.ofSeconds(10))
.build();
ConnectionPool pool = new ConnectionPool(poolConfig);
try {
// Use pool
Mono.from(pool.create())
.flatMap(connection ->
Mono.from(connection.createStatement("SELECT 1").execute())
.doFinally(signalType -> connection.close())
)
.block();
} catch (Exception e) {
if (e.getMessage().contains("pool") || e.getMessage().contains("timeout")) {
System.err.println("Connection pool exhausted. Consider:");
System.err.println("1. Increasing pool size");
System.err.println("2. Reducing connection timeout");
System.err.println("3. Checking for connection leaks");
}
throw e;
} finally {
pool.dispose();
}
}Validating connection options:
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15")) {
postgres.start();
ConnectionFactoryOptions options =
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres);
// Verify required options are present
assert options.hasOption(ConnectionFactoryOptions.DRIVER);
assert options.hasOption(ConnectionFactoryOptions.HOST);
assert options.hasOption(ConnectionFactoryOptions.PORT);
assert options.hasOption(ConnectionFactoryOptions.DATABASE);
assert options.hasOption(ConnectionFactoryOptions.USER);
assert options.hasOption(ConnectionFactoryOptions.PASSWORD);
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
// Test connection
Mono.from(connectionFactory.create())
.flatMap(connection ->
Mono.from(connection.createStatement("SELECT 1").execute())
.flatMap(result ->
Mono.from(result.map((row, metadata) -> row.get(0, Integer.class))))
.doFinally(signalType -> connection.close())
)
.block();
}PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15");
try {
// Error: Container not started yet
ConnectionFactoryOptions options =
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres);
} catch (IllegalStateException e) {
// Container must be started first
postgres.start();
ConnectionFactoryOptions options =
PostgreSQLR2DBCDatabaseContainer.getOptions(postgres);
}