0
# R2DBC Integration
1
2
Reactive database access using R2DBC for non-blocking database operations with MySQL containers, providing full integration between Testcontainers and R2DBC reactive streams.
3
4
## Capabilities
5
6
### MySQLR2DBCDatabaseContainer
7
8
Wrapper class that provides R2DBC connection factory options for MySQL containers, enabling reactive database access patterns.
9
10
```java { .api }
11
/**
12
* R2DBC wrapper for MySQLContainer providing reactive database access
13
* Implements R2DBCDatabaseContainer interface
14
*/
15
public class MySQLR2DBCDatabaseContainer implements R2DBCDatabaseContainer {
16
17
/**
18
* Creates R2DBC ConnectionFactoryOptions for the given MySQL container
19
* Static factory method that configures R2DBC connection options
20
* @param container MySQL container to create options for
21
* @return Configured ConnectionFactoryOptions for R2DBC usage
22
*/
23
public static ConnectionFactoryOptions getOptions(MySQLContainer<?> container);
24
25
/**
26
* Configures R2DBC connection options with container connection details
27
* @param options Base ConnectionFactoryOptions to configure
28
* @return Configured ConnectionFactoryOptions with host, port, database, and credentials
29
*/
30
public ConnectionFactoryOptions configure(ConnectionFactoryOptions options);
31
}
32
```
33
34
**Usage Examples:**
35
36
```java
37
import io.r2dbc.spi.ConnectionFactory;
38
import io.r2dbc.spi.ConnectionFactories;
39
import io.r2dbc.spi.ConnectionFactoryOptions;
40
import org.testcontainers.containers.MySQLContainer;
41
import org.testcontainers.containers.MySQLR2DBCDatabaseContainer;
42
43
// Create and start MySQL container
44
MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0")
45
.withDatabaseName("reactive_db")
46
.withUsername("r2dbc_user")
47
.withPassword("r2dbc_pass");
48
49
mysql.start();
50
51
// Get R2DBC connection factory options
52
ConnectionFactoryOptions options = MySQLR2DBCDatabaseContainer.getOptions(mysql);
53
54
// Create R2DBC connection factory
55
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
56
57
// Use reactive database operations
58
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
59
```
60
61
### MySQLR2DBCDatabaseContainerProvider
62
63
Provider for creating MySQL R2DBC containers from R2DBC connection factory options, enabling automatic container creation from R2DBC configuration.
64
65
```java { .api }
66
/**
67
* Provider for creating MySQL R2DBC containers from ConnectionFactoryOptions
68
* Implements R2DBCDatabaseContainerProvider interface
69
*/
70
public class MySQLR2DBCDatabaseContainerProvider implements R2DBCDatabaseContainerProvider {
71
72
/**
73
* MySQL R2DBC driver identifier used for provider matching
74
*/
75
static final String DRIVER = MySqlConnectionFactoryProvider.MYSQL_DRIVER;
76
77
/**
78
* Checks if this provider supports the given connection factory options
79
* @param options R2DBC ConnectionFactoryOptions to check
80
* @return true if options specify MySQL R2DBC driver, false otherwise
81
*/
82
public boolean supports(ConnectionFactoryOptions options);
83
84
/**
85
* Creates an R2DBC database container from connection factory options
86
* Automatically configures MySQL container with settings from options
87
* @param options R2DBC ConnectionFactoryOptions containing configuration
88
* @return Configured MySQLR2DBCDatabaseContainer instance
89
*/
90
public R2DBCDatabaseContainer createContainer(ConnectionFactoryOptions options);
91
92
/**
93
* Returns connection factory metadata for the given options
94
* Provides default credentials if not specified in options
95
* @param options R2DBC ConnectionFactoryOptions
96
* @return ConnectionFactoryMetadata with defaults applied
97
*/
98
@Nullable
99
public ConnectionFactoryMetadata getMetadata(ConnectionFactoryOptions options);
100
}
101
```
102
103
**Usage Examples:**
104
105
```java
106
import io.r2dbc.spi.ConnectionFactoryOptions;
107
import org.testcontainers.containers.MySQLR2DBCDatabaseContainerProvider;
108
import org.testcontainers.r2dbc.R2DBCDatabaseContainer;
109
110
// Create connection options for MySQL R2DBC
111
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
112
.option(ConnectionFactoryOptions.DRIVER, "mysql")
113
.option(ConnectionFactoryOptions.DATABASE, "test_reactive")
114
.option(ConnectionFactoryOptions.USER, "test")
115
.option(ConnectionFactoryOptions.PASSWORD, "test")
116
.build();
117
118
// Create provider and check support
119
MySQLR2DBCDatabaseContainerProvider provider = new MySQLR2DBCDatabaseContainerProvider();
120
boolean supported = provider.supports(options); // true
121
122
// Create R2DBC container from options
123
R2DBCDatabaseContainer r2dbcContainer = provider.createContainer(options);
124
125
// Start container and get connection options
126
r2dbcContainer.start();
127
ConnectionFactoryOptions configuredOptions = r2dbcContainer.configure(options);
128
```
129
130
### Delegated Container Lifecycle
131
132
The MySQLR2DBCDatabaseContainer delegates container lifecycle operations to the wrapped MySQLContainer through Lombok's @Delegate annotation.
133
134
**Delegated Methods (from Startable interface):**
135
136
```java { .api }
137
// Container lifecycle methods delegated to wrapped MySQLContainer
138
public void start();
139
public void stop();
140
public boolean isRunning();
141
public boolean isCreated();
142
// Additional lifecycle methods from GenericContainer
143
```
144
145
**Usage Examples:**
146
147
```java
148
MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0");
149
MySQLR2DBCDatabaseContainer r2dbcContainer = new MySQLR2DBCDatabaseContainer(mysql);
150
151
// Lifecycle operations work directly on R2DBC container
152
r2dbcContainer.start();
153
assertTrue(r2dbcContainer.isRunning());
154
155
// The underlying MySQL container is also started
156
assertTrue(mysql.isRunning());
157
158
r2dbcContainer.stop();
159
assertFalse(r2dbcContainer.isRunning());
160
```
161
162
### Connection Factory Configuration
163
164
Detailed configuration of R2DBC connection factory options with MySQL-specific settings.
165
166
```java { .api }
167
/**
168
* Connection factory configuration details applied by configure() method:
169
* - HOST: Container host address
170
* - PORT: Mapped MySQL port (3306 -> random host port)
171
* - DATABASE: Configured database name
172
* - USER: Database username
173
* - PASSWORD: Database password
174
* - DRIVER: MySQL R2DBC driver identifier
175
*/
176
public ConnectionFactoryOptions configure(ConnectionFactoryOptions options) {
177
return options.mutate()
178
.option(ConnectionFactoryOptions.HOST, container.getHost())
179
.option(ConnectionFactoryOptions.PORT, container.getMappedPort(MySQLContainer.MYSQL_PORT))
180
.option(ConnectionFactoryOptions.DATABASE, container.getDatabaseName())
181
.option(ConnectionFactoryOptions.USER, container.getUsername())
182
.option(ConnectionFactoryOptions.PASSWORD, container.getPassword())
183
.build();
184
}
185
```
186
187
**Configuration Examples:**
188
189
```java
190
// Manual configuration
191
MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0")
192
.withDatabaseName("reactive_app")
193
.withUsername("app_user")
194
.withPassword("app_secret");
195
196
mysql.start();
197
198
ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.builder()
199
.option(ConnectionFactoryOptions.DRIVER, "mysql")
200
.build();
201
202
MySQLR2DBCDatabaseContainer r2dbcContainer = new MySQLR2DBCDatabaseContainer(mysql);
203
ConnectionFactoryOptions finalOptions = r2dbcContainer.configure(baseOptions);
204
205
// Final options will contain:
206
// - HOST: mysql.getHost() (e.g., "localhost")
207
// - PORT: mysql.getMappedPort(3306) (e.g., 32768)
208
// - DATABASE: "reactive_app"
209
// - USER: "app_user"
210
// - PASSWORD: "app_secret"
211
// - DRIVER: "mysql"
212
```
213
214
### Reactive Database Operations
215
216
Complete examples of using R2DBC with Testcontainers MySQL for reactive database operations.
217
218
**Basic Reactive Operations:**
219
220
```java
221
import reactor.core.publisher.Flux;
222
import reactor.core.publisher.Mono;
223
import io.r2dbc.spi.Connection;
224
import io.r2dbc.spi.Statement;
225
226
@Test
227
void testReactiveOperations() {
228
MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0")
229
.withDatabaseName("reactive_test")
230
.withInitScript("init/reactive-schema.sql");
231
232
mysql.start();
233
234
ConnectionFactoryOptions options = MySQLR2DBCDatabaseContainer.getOptions(mysql);
235
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
236
237
// Reactive database operations
238
Mono<Void> result = Mono.from(connectionFactory.create())
239
.flatMap(connection -> {
240
// Create table reactively
241
Statement createTable = connection.createStatement(
242
"CREATE TABLE reactive_users (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100))"
243
);
244
245
return Mono.from(createTable.execute())
246
.then(insertUsers(connection))
247
.then(queryUsers(connection))
248
.doFinally(signalType -> connection.close());
249
});
250
251
// Execute reactive pipeline
252
StepVerifier.create(result)
253
.verifyComplete();
254
}
255
256
private Mono<Void> insertUsers(Connection connection) {
257
Statement insert = connection.createStatement(
258
"INSERT INTO reactive_users (name) VALUES (?)"
259
);
260
261
return Flux.just("Alice", "Bob", "Charlie")
262
.flatMap(name -> {
263
insert.bind(0, name);
264
return insert.execute();
265
})
266
.then();
267
}
268
269
private Mono<Void> queryUsers(Connection connection) {
270
Statement select = connection.createStatement("SELECT id, name FROM reactive_users");
271
272
return Flux.from(select.execute())
273
.flatMap(result -> result.map((row, metadata) -> {
274
return new User(row.get("id", Integer.class), row.get("name", String.class));
275
}))
276
.doOnNext(user -> System.out.println("User: " + user))
277
.then();
278
}
279
```
280
281
### Integration with Spring Data R2DBC
282
283
Integration examples with Spring Data R2DBC for reactive repositories and transactions.
284
285
**Spring Data R2DBC Configuration:**
286
287
```java
288
@TestConfiguration
289
public class R2DBCTestConfiguration {
290
291
@Container
292
static MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0")
293
.withDatabaseName("spring_r2dbc_test")
294
.withUsername("spring_user")
295
.withPassword("spring_pass");
296
297
@Bean
298
public ConnectionFactory connectionFactory() {
299
ConnectionFactoryOptions options = MySQLR2DBCDatabaseContainer.getOptions(mysql);
300
return ConnectionFactories.get(options);
301
}
302
303
@Bean
304
public R2dbcTransactionManager transactionManager(ConnectionFactory connectionFactory) {
305
return new R2dbcTransactionManager(connectionFactory);
306
}
307
}
308
309
// Reactive repository usage
310
@Repository
311
public interface ReactiveUserRepository extends ReactiveCrudRepository<User, Long> {
312
313
@Query("SELECT * FROM users WHERE name LIKE :name%")
314
Flux<User> findByNameStartingWith(String name);
315
316
@Query("SELECT COUNT(*) FROM users")
317
Mono<Long> countUsers();
318
}
319
320
@SpringBootTest
321
@Testcontainers
322
class SpringR2DBCIntegrationTest {
323
324
@Autowired
325
private ReactiveUserRepository userRepository;
326
327
@Test
328
void testReactiveRepository() {
329
User newUser = new User(null, "Test User");
330
331
StepVerifier.create(
332
userRepository.save(newUser)
333
.then(userRepository.countUsers())
334
)
335
.expectNext(1L)
336
.verifyComplete();
337
}
338
}
339
```
340
341
### Provider Options and Container Reuse
342
343
Advanced configuration options for container creation and reuse with R2DBC providers.
344
345
```java { .api }
346
/**
347
* Provider supports additional options for container configuration:
348
* - IMAGE_TAG_OPTION: Specify MySQL image tag
349
* - REUSABLE_OPTION: Enable container reuse across test runs
350
*/
351
public R2DBCDatabaseContainer createContainer(ConnectionFactoryOptions options) {
352
String image = MySQLContainer.IMAGE + ":" + options.getRequiredValue(IMAGE_TAG_OPTION);
353
MySQLContainer<?> container = new MySQLContainer<>(image)
354
.withDatabaseName((String) options.getRequiredValue(ConnectionFactoryOptions.DATABASE));
355
356
if (Boolean.TRUE.equals(options.getValue(REUSABLE_OPTION))) {
357
container.withReuse(true);
358
}
359
360
return new MySQLR2DBCDatabaseContainer(container);
361
}
362
```
363
364
**Usage Examples:**
365
366
```java
367
// R2DBC provider with container reuse
368
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
369
.option(ConnectionFactoryOptions.DRIVER, "mysql")
370
.option(ConnectionFactoryOptions.DATABASE, "persistent_test_db")
371
.option(IMAGE_TAG_OPTION, "8.0")
372
.option(REUSABLE_OPTION, true)
373
.build();
374
375
MySQLR2DBCDatabaseContainerProvider provider = new MySQLR2DBCDatabaseContainerProvider();
376
R2DBCDatabaseContainer container = provider.createContainer(options);
377
378
// Container will be reused across multiple test runs
379
container.start();
380
```
381
382
### Error Handling and Best Practices
383
384
**R2DBC Error Handling:**
385
386
```java
387
@Test
388
void testR2DBCErrorHandling() {
389
MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0");
390
mysql.start();
391
392
ConnectionFactoryOptions options = MySQLR2DBCDatabaseContainer.getOptions(mysql);
393
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
394
395
// Handle connection errors
396
Mono<Connection> connectionMono = Mono.from(connectionFactory.create())
397
.onErrorMap(R2dbcException.class, ex ->
398
new RuntimeException("Failed to connect to test database", ex));
399
400
// Handle SQL errors
401
Mono<Void> operation = connectionMono
402
.flatMap(connection -> {
403
Statement stmt = connection.createStatement("INVALID SQL");
404
return Mono.from(stmt.execute())
405
.onErrorMap(R2dbcBadGrammarException.class, ex ->
406
new RuntimeException("SQL syntax error in test", ex))
407
.then(Mono.fromRunnable(connection::close));
408
});
409
410
StepVerifier.create(operation)
411
.expectErrorMatches(throwable ->
412
throwable instanceof RuntimeException &&
413
throwable.getMessage().contains("SQL syntax error"))
414
.verify();
415
}
416
```
417
418
**Best Practices:**
419
420
1. **Resource Management**: Always close connections in reactive chains
421
2. **Error Handling**: Use proper error mapping for test clarity
422
3. **Container Lifecycle**: Start containers before creating connection factories
423
4. **Test Isolation**: Use separate databases for concurrent tests
424
5. **Performance**: Consider container reuse for development environments
425
426
**Transaction Management:**
427
428
```java
429
@Test
430
void testReactiveTransactions() {
431
// Use Spring's R2dbcTransactionManager for proper transaction handling
432
TransactionalOperator transactionalOperator = TransactionalOperator.create(transactionManager);
433
434
Mono<Void> transactionalOperation = userRepository.save(user1)
435
.then(userRepository.save(user2))
436
.then()
437
.as(transactionalOperator::transactional);
438
439
StepVerifier.create(transactionalOperation)
440
.verifyComplete();
441
}