or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdjdbc-container.mdjdbc-provider.mdr2dbc-support.md
tile.json

r2dbc-support.mddocs/

R2DBC Support API

The ClickHouse Testcontainers module provides R2DBC (Reactive Relational Database Connectivity) support for non-blocking, reactive database access. This enables integration with reactive frameworks like Spring WebFlux and Project Reactor.

Note: R2DBC support requires the org.testcontainers:r2dbc module and the ClickHouse R2DBC driver (com.clickhouse:clickhouse-r2dbc) as dependencies.

Key Information for Agents

Required Dependencies:

  • org.testcontainers:clickhouse (this package)
  • org.testcontainers:r2dbc (R2DBC module)
  • com.clickhouse:clickhouse-r2dbc (ClickHouse R2DBC driver)
  • Project Reactor (for reactive streams)

Core Capabilities:

  • R2DBC container wrapper for existing ClickHouseContainer
  • Static method for getting connection factory options
  • R2DBC provider for URL-based container instantiation
  • Automatic configuration of connection factory options
  • Reactive database operations with non-blocking I/O

Key Interfaces and Classes:

  • ClickHouseR2DBCDatabaseContainer - R2DBC wrapper: new ClickHouseR2DBCDatabaseContainer(ClickHouseContainer), getOptions(ClickHouseContainer), configure(ConnectionFactoryOptions), start(), stop()
  • ClickHouseR2DBCDatabaseContainerProvider - R2DBC provider: supports(ConnectionFactoryOptions), createContainer(ConnectionFactoryOptions), getMetadata(ConnectionFactoryOptions)

Default R2DBC Configuration:

  • Driver: clickhouse
  • Protocol: http (uses ClickHouse HTTP interface)
  • Port: 8123 (HTTP port)
  • Host: Container host from getHost()
  • Database: From container configuration (default: "default")
  • User: From container configuration (default: "test")
  • Password: From container configuration (default: "test")

Threading Model:

  • R2DBC operations are non-blocking and asynchronous
  • Connection factory operations return reactive types (Mono, Flux)
  • Container lifecycle methods (start(), stop()) are blocking
  • Multiple reactive streams can use the same connection factory concurrently
  • Connection factory is thread-safe
  • Container instances are not thread-safe for concurrent modification

Lifecycle:

  • Container must be started before R2DBC operations (not enforced at construction)
  • Wrapper does not start container automatically
  • start() and stop() delegate to wrapped container
  • Container lifecycle is independent of connection factory lifecycle
  • Connections are created on-demand from connection factory
  • Connections must be closed explicitly or via reactive stream completion

Exceptions:

  • IllegalArgumentException: Invalid parameters (null container, null options)
  • IllegalStateException: Container not started when accessing port/host
  • ClassNotFoundException: R2DBC driver not found on classpath
  • ContainerLaunchException: Container fails to start
  • Reactive stream errors: Errors propagate through reactive streams (Mono, Flux)

Edge Cases:

  • Null container throws IllegalArgumentException
  • Container must be started before R2DBC operations (not enforced at construction)
  • Options are immutable (modifications return new instance)
  • Existing options in input are preserved, container options override
  • Missing R2DBC module on classpath causes ClassNotFoundException when creating connection factory
  • Container not started throws IllegalStateException when accessing port/host
  • Invalid R2DBC URLs throw IllegalArgumentException or use wrong provider
  • Container stops while connection is active causes connection errors in reactive streams
  • Connection factory is thread-safe, but individual connections may not be
  • Reactive streams must be properly subscribed to execute
  • Errors in reactive streams must be handled via error callbacks

Capabilities

R2DBC Container Wrapper

The ClickHouseR2DBCDatabaseContainer class wraps a standard ClickHouseContainer and provides R2DBC-specific functionality.

/**
 * Wrap an existing ClickHouseContainer for R2DBC usage
 * The container must be started before using R2DBC operations
 * @param container ClickHouse container instance to wrap (cannot be null)
 * @throws IllegalArgumentException if container is null
 */
public ClickHouseR2DBCDatabaseContainer(ClickHouseContainer container);

Usage Examples:

import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;

// Create base container
ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
container.start();

// Wrap for R2DBC usage
ClickHouseR2DBCDatabaseContainer r2dbcContainer = new ClickHouseR2DBCDatabaseContainer(container);

Edge Cases:

  • Null container throws IllegalArgumentException
  • Container must be started before R2DBC operations (not enforced at construction)
  • Wrapper does not start the container automatically
  • Wrapper delegates lifecycle methods to wrapped container

Connection Options

Get R2DBC connection factory options configured for the container. Options are immutable and can be used to create connection factories.

/**
 * Get R2DBC ConnectionFactoryOptions for the specified ClickHouse container
 * Configures host, port (8123), database, user, password, and protocol (http)
 * This is a static convenience method that does not require wrapper instantiation
 * @param container ClickHouse container instance (cannot be null, must be started)
 * @return Configured ConnectionFactoryOptions with all required options set
 * @throws IllegalArgumentException if container is null
 * @throws IllegalStateException if container is not started
 */
public static ConnectionFactoryOptions getOptions(ClickHouseContainer container);

/**
 * Configure ConnectionFactoryOptions with container details
 * Sets host, port (8123), database name, username, password, and protocol (http)
 * Merges with existing options, container options take precedence
 * @param options Base ConnectionFactoryOptions to configure (cannot be null)
 * @return Configured ConnectionFactoryOptions with container details merged
 * @throws IllegalArgumentException if options is null
 * @throws IllegalStateException if wrapped container is not started
 */
public ConnectionFactoryOptions configure(ConnectionFactoryOptions options);

Usage Examples:

import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import reactor.core.publisher.Mono;

// Create and start container
ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
container.start();

// Get R2DBC connection options (static method)
ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(container);

// Create connection factory
ConnectionFactory connectionFactory = ConnectionFactories.get(options);

// Use connection factory with reactive streams
Mono.from(connectionFactory.create())
    .flatMapMany(connection ->
        Mono.from(connection.createStatement("SELECT 1").execute())
            .flatMapMany(result -> result.map((row, metadata) -> row.get(0, Integer.class)))
    )
    .subscribe(value -> System.out.println("Result: " + value));

Option Configuration Details:

  • DRIVER: Set to "clickhouse"
  • PROTOCOL: Set to "http"
  • HOST: Set to container host (from container.getHost())
  • PORT: Set to HTTP port (8123, mapped port from container.getMappedPort(8123))
  • DATABASE: Set to database name (from container.getDatabaseName())
  • USER: Set to username (from container.getUsername())
  • PASSWORD: Set to password (from container.getPassword())

Edge Cases:

  • Null container throws IllegalArgumentException
  • Container not started throws IllegalStateException when accessing port/host
  • Options are immutable (modifications return new instance)
  • Existing options in input are preserved, container options override
  • Static method does not require wrapper instantiation
  • Options can be extended with additional parameters

Container Lifecycle

Manage the wrapped container's lifecycle. Lifecycle methods delegate to the wrapped container.

/**
 * Start the wrapped ClickHouse container
 * Blocks until container is healthy or timeout occurs
 * @throws ContainerLaunchException if container fails to start
 * @throws TimeoutException if startup timeout is exceeded
 * @throws IllegalStateException if container is already started
 */
public void start();

/**
 * Stop the wrapped ClickHouse container
 * Stops and removes the container
 * Safe to call multiple times (idempotent)
 * @throws IllegalStateException if container was never started
 */
public void stop();

Usage Examples:

import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;

ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
ClickHouseR2DBCDatabaseContainer r2dbcContainer = new ClickHouseR2DBCDatabaseContainer(container);

// Start through R2DBC wrapper
r2dbcContainer.start();

// Use container...

// Stop through R2DBC wrapper
r2dbcContainer.stop();

Lifecycle Notes:

  • start() and stop() delegate to wrapped container
  • Wrapper does not own container lifecycle (container can be started/stopped directly)
  • Starting wrapper when container already started throws IllegalStateException
  • Stopping wrapper when container not started throws IllegalStateException
  • Container can be started/stopped directly on wrapped container

R2DBC Provider

The ClickHouseR2DBCDatabaseContainerProvider class enables automatic container creation from R2DBC connection URLs. The provider is automatically discovered via Java's ServiceLoader mechanism.

/**
 * Check if this provider supports the given R2DBC connection options
 * Checks if the driver option matches "clickhouse"
 * @param options ConnectionFactoryOptions to check (cannot be null)
 * @return true if the driver option matches ClickHouse R2DBC driver, false otherwise
 * @throws IllegalArgumentException if options is null
 */
public boolean supports(ConnectionFactoryOptions options);

/**
 * Create a new R2DBC container from connection options
 * Parses URL parameters (TC_IMAGE_TAG, TC_REUSABLE, etc.) from options
 * @param options ConnectionFactoryOptions containing configuration (cannot be null)
 * @return New R2DBCDatabaseContainer instance (ClickHouseR2DBCDatabaseContainer)
 * @throws IllegalArgumentException if options is null or invalid
 * @throws ContainerLaunchException if container fails to start
 */
public R2DBCDatabaseContainer createContainer(ConnectionFactoryOptions options);

/**
 * Get connection metadata for the given options
 * Automatically adds default user and password if not specified
 * @param options ConnectionFactoryOptions to process (cannot be null)
 * @return ConnectionFactoryMetadata with defaults applied
 * @throws IllegalArgumentException if options is null
 */
public ConnectionFactoryMetadata getMetadata(ConnectionFactoryOptions options);

R2DBC URL Format

r2dbc:tc:clickhouse:///[database]?[parameters]

Components:

  • r2dbc:tc: - Testcontainers R2DBC URL prefix (required)
  • clickhouse - Database type identifier (required, case-sensitive)
  • database - Database name (optional, defaults to "default")
  • parameters - Optional URL parameters

Common Parameters:

  • TC_IMAGE_TAG - Docker image tag to use (e.g., ?TC_IMAGE_TAG=24.12-alpine)
  • TC_REUSABLE - Enable container reuse (e.g., ?TC_REUSABLE=true)
  • TC_INITSCRIPT - Path to initialization script (e.g., ?TC_INITSCRIPT=init.sql)
  • Standard R2DBC parameters (user, password) are ignored (uses container defaults)

URL Parsing:

  • Parameters are URL-decoded automatically
  • Multiple parameters separated by &
  • Duplicate parameters: last value wins
  • Invalid parameters are ignored (no error)

Usage Examples

Basic R2DBC URL usage:

import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ClickHouseR2DBCUrlTest {

    public void testWithR2dbcUrl() {
        // Create connection factory from Testcontainers R2DBC URL
        // Container is automatically created and started
        String r2dbcUrl = "r2dbc:tc:clickhouse:///testdb?TC_IMAGE_TAG=24.12-alpine";
        ConnectionFactory connectionFactory = ConnectionFactories.get(r2dbcUrl);

        // Use reactive streams for database operations
        Mono.from(connectionFactory.create())
            .flatMapMany(connection ->
                Mono.from(connection.createStatement("SELECT version()").execute())
                    .flatMapMany(result -> result.map((row, metadata) ->
                        row.get(0, String.class)))
                    .doFinally(signalType -> Mono.from(connection.close()).subscribe())
            )
            .subscribe(
                version -> System.out.println("ClickHouse version: " + version),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Complete")
            );
    }
}

R2DBC URL with multiple parameters:

import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;

public class ClickHouseR2DBCMultipleParamsTest {
    public void testWithMultipleParams() {
        String r2dbcUrl = "r2dbc:tc:clickhouse:///mydb?" +
                          "TC_IMAGE_TAG=24.12-alpine&" +
                          "TC_REUSABLE=true&" +
                          "TC_INITSCRIPT=schema.sql";

        ConnectionFactory factory = ConnectionFactories.get(r2dbcUrl);
        // Use factory...
    }
}

Complete Examples

Basic R2DBC Usage

import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ClickHouseR2DBCExample {

    public void basicR2dbcUsage() {
        // Create and configure container
        ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine")
            .withDatabaseName("testdb")
            .withUsername("user")
            .withPassword("pass");

        container.start();

        try {
            // Get R2DBC connection options
            ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(container);

            // Create connection factory
            ConnectionFactory connectionFactory = ConnectionFactories.get(options);

            // Perform reactive database operations
            Mono.from(connectionFactory.create())
                .flatMapMany(connection -> {
                    // Create table
                    Mono<Void> createTable = Mono.from(
                        connection.createStatement(
                            "CREATE TABLE IF NOT EXISTS users (" +
                            "  id UInt32, " +
                            "  name String" +
                            ") ENGINE = MergeTree() ORDER BY id"
                        ).execute()
                    ).then();

                    // Insert data
                    Mono<Void> insertData = Mono.from(
                        connection.createStatement(
                            "INSERT INTO users (id, name) VALUES (?, ?)"
                        )
                        .bind(0, 1)
                        .bind(1, "Alice")
                        .execute()
                    ).then();

                    // Query data
                    Flux<String> queryData = Mono.from(
                        connection.createStatement("SELECT name FROM users WHERE id = ?")
                            .bind(0, 1)
                            .execute()
                    )
                    .flatMapMany(result -> result.map((row, metadata) ->
                        row.get("name", String.class)));

                    // Chain operations
                    return createTable
                        .then(insertData)
                        .thenMany(queryData)
                        .doFinally(signalType -> Mono.from(connection.close()).subscribe());
                })
                .subscribe(
                    name -> System.out.println("User: " + name),
                    error -> System.err.println("Error: " + error),
                    () -> System.out.println("Complete")
                );

            // Wait for async operations (in test, use proper waiting mechanism)
            Thread.sleep(2000);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            container.stop();
        }
    }
}

R2DBC URL-Based Usage

import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import reactor.core.publisher.Mono;

public class ClickHouseR2DBCUrlExample {

    public void urlBasedR2dbcUsage() {
        // Use Testcontainers R2DBC URL for automatic container management
        String r2dbcUrl = "r2dbc:tc:clickhouse:///mydb?" +
                          "TC_IMAGE_TAG=24.12-alpine&" +
                          "TC_REUSABLE=true";

        ConnectionFactory factory = ConnectionFactories.get(r2dbcUrl);

        // Execute query using reactive streams
        Mono.from(factory.create())
            .flatMapMany(connection ->
                Mono.from(connection.createStatement("SELECT 1 as result").execute())
                    .flatMapMany(result -> result.map((row, metadata) ->
                        row.get("result", Integer.class)))
                    .doFinally(signalType -> Mono.from(connection.close()).subscribe())
            )
            .subscribe(
                value -> System.out.println("Result: " + value),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Complete")
            );
    }
}

Spring WebFlux Integration

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;

@Configuration
@EnableR2dbcRepositories
public class R2dbcConfig extends AbstractR2dbcConfiguration {

    @Bean
    @Override
    public ConnectionFactory connectionFactory() {
        // Use Testcontainers R2DBC URL for integration tests
        String r2dbcUrl = "r2dbc:tc:clickhouse:///testdb?" +
                          "TC_IMAGE_TAG=24.12-alpine&" +
                          "TC_REUSABLE=true";

        return ConnectionFactories.get(r2dbcUrl);
    }
}

Spring Boot Test with Dynamic Properties:

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;

@SpringBootTest
@Testcontainers
public class SpringWebFluxIntegrationTest {
    @Container
    static ClickHouseContainer clickhouse = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");

    @DynamicPropertySource
    static void configureProperties(DynamicPropertyRegistry registry) {
        ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(clickhouse);
        registry.add("spring.r2dbc.url", () -> "r2dbc:clickhouse://" + 
            options.getRequiredValue(ConnectionFactoryOptions.HOST) + ":" +
            options.getRequiredValue(ConnectionFactoryOptions.PORT) + "/" +
            options.getRequiredValue(ConnectionFactoryOptions.DATABASE));
        registry.add("spring.r2dbc.username", () -> 
            options.getRequiredValue(ConnectionFactoryOptions.USER));
        registry.add("spring.r2dbc.password", () -> 
            options.getRequiredValue(ConnectionFactoryOptions.PASSWORD));
    }

    // Tests use Spring-managed R2DBC ConnectionFactory
}

Project Reactor Example

import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseR2DBCDatabaseContainer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;

public class ReactorExample {

    public void reactiveStreamProcessing() {
        ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
        container.start();

        try {
            ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(container);
            ConnectionFactory factory = ConnectionFactories.get(options);

            // Process data reactively with backpressure
            Flux.usingWhen(
                Mono.from(factory.create()), // Create connection
                connection -> {
                    // Create table and insert test data
                    return Mono.from(
                        connection.createStatement(
                            "CREATE TABLE IF NOT EXISTS numbers (value UInt32) ENGINE = Memory"
                        ).execute()
                    )
                    .then(Mono.from(
                        connection.createStatement(
                            "INSERT INTO numbers SELECT number FROM numbers(100)"
                        ).execute()
                    ))
                    .thenMany(
                        // Query data as stream
                        Mono.from(
                            connection.createStatement("SELECT value FROM numbers").execute()
                        )
                        .flatMapMany(result ->
                            result.map((row, metadata) -> row.get("value", Integer.class))
                        )
                    );
                },
                connection -> Mono.from(connection.close()) // Close connection
            )
            .filter(value -> value % 2 == 0) // Filter even numbers
            .map(value -> value * 2)          // Transform
            .buffer(10)                        // Batch processing
            .delayElements(Duration.ofMillis(100)) // Rate limiting
            .subscribe(
                batch -> System.out.println("Processed batch: " + batch),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Processing complete")
            );

            // Keep application running for async operations
            Thread.sleep(5000);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            container.stop();
        }
    }
}

Error Handling in Reactive Streams

import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;

public class R2DBCErrorHandlingExample {
    public void handleErrors() {
        String r2dbcUrl = "r2dbc:tc:clickhouse:///testdb?TC_IMAGE_TAG=24.12-alpine";
        ConnectionFactory factory = ConnectionFactories.get(r2dbcUrl);

        Mono.from(factory.create())
            .flatMapMany(connection ->
                Mono.from(connection.createStatement("SELECT * FROM nonexistent_table").execute())
                    .flatMapMany(result -> result.map((row, metadata) -> row))
            )
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
            .onErrorResume(error -> {
                System.err.println("Error after retries: " + error.getMessage());
                return Mono.empty();
            })
            .doFinally(signalType -> {
                // Cleanup
            })
            .subscribe();
    }
}

Configuration Details

The R2DBC support automatically configures:

  • Driver: ClickHouse R2DBC driver (clickhouse)
  • Protocol: http (uses ClickHouse HTTP interface)
  • Port: 8123 (HTTP port, mapped to random host port)
  • Host: Container host from getHost() (typically "localhost")
  • Database: From container configuration (default: "default")
  • User: From container configuration (default: "test")
  • Password: From container configuration (default: "test")

Connection Factory Options:

  • Options are immutable and thread-safe
  • Options can be extended with additional parameters
  • Options are validated when connection factory is created

Dependencies Required

To use R2DBC support, add the following dependencies:

Maven:

<!-- Testcontainers R2DBC module -->
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>r2dbc</artifactId>
    <version>1.21.4</version>
    <scope>test</scope>
</dependency>

<!-- ClickHouse R2DBC driver -->
<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-r2dbc</artifactId>
    <version>0.7.2</version>
    <classifier>http</classifier>
    <scope>test</scope>
</dependency>

<!-- Project Reactor (usually provided transitively) -->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.5.0</version>
    <scope>test</scope>
</dependency>

Gradle:

testImplementation "org.testcontainers:r2dbc:1.21.4"
testImplementation "com.clickhouse:clickhouse-r2dbc:0.7.2:http"
testImplementation "io.projectreactor:reactor-core:3.5.0"

Edge Cases and Error Scenarios

Missing Dependencies:

// If R2DBC module is not on classpath
ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(container);
// No error at this point, error occurs when creating connection factory

ConnectionFactory factory = ConnectionFactories.get(options);
// Throws ClassNotFoundException if R2DBC driver not found

Container Not Started:

ClickHouseContainer container = new ClickHouseContainer("clickhouse/clickhouse-server:24.12-alpine");
// Container not started

ConnectionFactoryOptions options = ClickHouseR2DBCDatabaseContainer.getOptions(container);
// Throws IllegalStateException when accessing port/host

Invalid R2DBC URL:

// Missing database type
String url = "r2dbc:tc:///testdb";
ConnectionFactory factory = ConnectionFactories.get(url);
// Throws IllegalArgumentException or uses wrong provider

Connection Failures:

// Container stops while connection is active
Mono.from(factory.create())
    .flatMapMany(connection -> {
        container.stop(); // Stop container
        return Mono.from(connection.createStatement("SELECT 1").execute())
            .flatMapMany(result -> result.map((row, metadata) -> row));
    })
    .subscribe(
        value -> {},
        error -> {
            // Error: connection lost, container stopped
            System.err.println("Connection error: " + error.getMessage());
        }
    );

Reactive Stream Errors:

// Errors in reactive streams must be handled
Mono.from(factory.create())
    .flatMapMany(connection ->
        Mono.from(connection.createStatement("SELECT 1").execute())
            .flatMapMany(result -> result.map((row, metadata) -> row))
    )
    .subscribe(
        value -> System.out.println("Value: " + value),
        error -> System.err.println("Error: " + error), // Must handle errors
        () -> System.out.println("Complete")
    );

Connection Not Closed:

// Connections must be closed explicitly
Mono.from(factory.create())
    .flatMapMany(connection -> {
        // Use connection...
        return Flux.just("data");
        // Connection not closed - resource leak!
    })
    .subscribe();

// Correct: Close connection
Mono.from(factory.create())
    .flatMapMany(connection ->
        Flux.just("data")
            .doFinally(signalType -> Mono.from(connection.close()).subscribe())
    )
    .subscribe();