0
# Testing Utilities
1
2
Additional testing utilities for advanced scenarios including race condition testing, log output verification, and custom formatting support.
3
4
## Capabilities
5
6
### Race Condition Testing
7
8
RaceTestUtils provides utilities for testing race conditions by synchronizing concurrent operations.
9
10
```java { .api }
11
class RaceTestUtils {
12
/** Generic race condition testing with state management */
13
static <T> T race(
14
T initial,
15
Function<? super T, ? extends T> race,
16
Predicate<? super T> stopRace,
17
BiPredicate<? super T, ? super T> terminate
18
);
19
20
/** Synchronize execution of multiple Runnables */
21
static void race(Runnable... runnables);
22
23
/** Synchronize two Runnables (binary compatibility) */
24
static void race(Runnable r1, Runnable r2);
25
26
/** Race on specific scheduler */
27
static void race(Scheduler scheduler, Runnable... runnables);
28
29
/** Race with configurable timeout */
30
static void race(int timeoutSeconds, Scheduler scheduler, Runnable... runnables);
31
}
32
```
33
34
**Usage Examples:**
35
36
```java
37
import reactor.test.util.RaceTestUtils;
38
39
@Test
40
public void testConcurrentModification() {
41
AtomicInteger counter = new AtomicInteger(0);
42
List<Integer> results = new CopyOnWriteArrayList<>();
43
44
// Race multiple operations
45
RaceTestUtils.race(
46
() -> results.add(counter.incrementAndGet()),
47
() -> results.add(counter.incrementAndGet()),
48
() -> results.add(counter.incrementAndGet())
49
);
50
51
// Verify all operations completed
52
assert results.size() == 3;
53
assert counter.get() == 3;
54
// Order may vary due to race conditions
55
}
56
57
@Test
58
public void testPublisherConcurrency() {
59
TestPublisher<String> publisher = TestPublisher.create();
60
AtomicReference<String> result1 = new AtomicReference<>();
61
AtomicReference<String> result2 = new AtomicReference<>();
62
63
// Setup subscribers
64
publisher.flux().subscribe(result1::set);
65
publisher.flux().subscribe(result2::set);
66
67
// Race signal emission and subscription
68
RaceTestUtils.race(
69
() -> publisher.next("value1"),
70
() -> publisher.next("value2")
71
);
72
73
// Both subscribers should receive a value
74
assert result1.get() != null;
75
assert result2.get() != null;
76
}
77
78
@Test
79
public void testSchedulerRacing() {
80
Scheduler testScheduler = Schedulers.parallel();
81
AtomicInteger executionCount = new AtomicInteger(0);
82
83
// Race on specific scheduler
84
RaceTestUtils.race(testScheduler,
85
() -> executionCount.incrementAndGet(),
86
() -> executionCount.incrementAndGet(),
87
() -> executionCount.incrementAndGet()
88
);
89
90
assert executionCount.get() == 3;
91
}
92
```
93
94
### State-Based Race Testing
95
96
Advanced race testing with state management:
97
98
```java
99
@Test
100
public void testStatefulRaceConditions() {
101
// Test concurrent map updates
102
Map<String, Integer> map = new ConcurrentHashMap<>();
103
104
String result = RaceTestUtils.race(
105
"initial", // Initial state
106
state -> { // Race function
107
map.put("key", map.getOrDefault("key", 0) + 1);
108
return state + "_updated";
109
},
110
state -> state.length() > 50, // Stop condition
111
(prev, curr) -> !prev.equals(curr) // Termination condition
112
);
113
114
// Verify final state
115
assert map.containsKey("key");
116
assert result.contains("updated");
117
}
118
```
119
120
### Log Output Verification
121
122
TestLogger and LoggerUtils provide log capture and verification capabilities.
123
124
```java { .api }
125
class TestLogger implements Logger {
126
/** Default constructor with thread name logging enabled */
127
TestLogger();
128
129
/** Constructor with thread name option */
130
TestLogger(boolean logCurrentThreadName);
131
132
/** Get error stream content as string */
133
String getErrContent();
134
135
/** Get output stream content as string */
136
String getOutContent();
137
138
/** Clear both output buffers */
139
void reset();
140
141
/** Check thread name logging setting */
142
boolean isLogCurrentThreadName();
143
144
// Standard Logger methods
145
void trace(String msg);
146
void debug(String msg);
147
void info(String msg);
148
void warn(String msg);
149
void error(String msg);
150
151
// Formatted logging
152
void trace(String format, Object... arguments);
153
void debug(String format, Object... arguments);
154
void info(String format, Object... arguments);
155
void warn(String format, Object... arguments);
156
void error(String format, Object... arguments);
157
158
// With throwable
159
void trace(String msg, Throwable t);
160
void debug(String msg, Throwable t);
161
void info(String msg, Throwable t);
162
void warn(String msg, Throwable t);
163
void error(String msg, Throwable t);
164
165
// Level checks
166
boolean isTraceEnabled();
167
boolean isDebugEnabled();
168
boolean isInfoEnabled();
169
boolean isWarnEnabled();
170
boolean isErrorEnabled();
171
172
String getName();
173
}
174
```
175
176
**Usage Examples:**
177
178
```java
179
import reactor.test.util.TestLogger;
180
181
@Test
182
public void testLoggingOutput() {
183
TestLogger logger = new TestLogger();
184
185
// Simulate logging
186
logger.info("Processing started");
187
logger.warn("Low memory warning");
188
logger.error("Processing failed", new RuntimeException("Test error"));
189
190
// Verify log output
191
String output = logger.getOutContent();
192
assert output.contains("Processing started");
193
assert output.contains("Low memory warning");
194
195
String errorOutput = logger.getErrContent();
196
assert errorOutput.contains("Processing failed");
197
assert errorOutput.contains("RuntimeException");
198
assert errorOutput.contains("Test error");
199
200
// Reset for next test
201
logger.reset();
202
assert logger.getOutContent().isEmpty();
203
assert logger.getErrContent().isEmpty();
204
}
205
206
@Test
207
public void testThreadNameLogging() {
208
TestLogger withThreadName = new TestLogger(true);
209
TestLogger withoutThreadName = new TestLogger(false);
210
211
withThreadName.info("Test message");
212
withoutThreadName.info("Test message");
213
214
String withThread = withThreadName.getOutContent();
215
String withoutThread = withoutThreadName.getOutContent();
216
217
// Thread name logger includes thread info
218
assert withThread.contains(Thread.currentThread().getName());
219
assert !withoutThread.contains(Thread.currentThread().getName());
220
}
221
```
222
223
### Logger Utilities
224
225
LoggerUtils provides global log capture for Reactor's internal logging.
226
227
```java { .api }
228
class LoggerUtils {
229
/** Install capturing logger factory */
230
static void useCurrentLoggersWithCapture();
231
232
/** Enable log capture to specific logger */
233
static void enableCaptureWith(Logger testLogger);
234
235
/** Enable capture with optional redirect to original */
236
static void enableCaptureWith(Logger testLogger, boolean redirectToOriginal);
237
238
/** Disable log capture and restore original factory */
239
static void disableCapture();
240
}
241
```
242
243
**Usage Examples:**
244
245
```java
246
import reactor.test.util.LoggerUtils;
247
248
@Test
249
public void testReactorInternalLogging() {
250
TestLogger testLogger = new TestLogger();
251
252
try {
253
// Capture Reactor's internal logs
254
LoggerUtils.enableCaptureWith(testLogger);
255
256
// Perform operations that generate internal logs
257
Flux.range(1, 10)
258
.log() // This will generate internal log messages
259
.subscribe();
260
261
// Verify internal logs were captured
262
String logOutput = testLogger.getOutContent();
263
assert logOutput.contains("onSubscribe");
264
assert logOutput.contains("request");
265
assert logOutput.contains("onNext");
266
assert logOutput.contains("onComplete");
267
268
} finally {
269
// Always restore original logging
270
LoggerUtils.disableCapture();
271
}
272
}
273
274
@Test
275
public void testLogCaptureWithRedirect() {
276
TestLogger testLogger = new TestLogger();
277
278
try {
279
// Capture and also redirect to original loggers
280
LoggerUtils.enableCaptureWith(testLogger, true);
281
282
// Operations will log to both test logger and console
283
Flux.just("test")
284
.doOnNext(v -> System.out.println("Processing: " + v))
285
.log()
286
.subscribe();
287
288
// Verify capture
289
assert !testLogger.getOutContent().isEmpty();
290
291
} finally {
292
LoggerUtils.disableCapture();
293
}
294
}
295
```
296
297
### Value Formatting
298
299
ValueFormatters provides utilities for custom value display in test output.
300
301
```java { .api }
302
class ValueFormatters {
303
/** Create class-specific formatter */
304
static <T> ToStringConverter forClass(Class<T> tClass, Function<T, String> tToString);
305
306
/** Create filtered class formatter */
307
static <T> ToStringConverter forClassMatching(
308
Class<T> tClass,
309
Predicate<T> tPredicate,
310
Function<T, String> tToString
311
);
312
313
/** Create predicate-based formatter */
314
static ToStringConverter filtering(
315
Predicate<Object> predicate,
316
Function<Object, String> anyToString
317
);
318
319
/** Get default Signal extractor */
320
static Extractor<Signal<?>> signalExtractor();
321
322
/** Get default Iterable extractor */
323
static Extractor<Iterable<?>> iterableExtractor();
324
325
/** Get array extractor for specific array type */
326
static <T> Extractor<T[]> arrayExtractor(Class<T[]> arrayClass);
327
328
/** Default Duration formatter */
329
ToStringConverter DURATION_CONVERTER;
330
}
331
332
@FunctionalInterface
333
interface ToStringConverter extends Function<Object, String> {}
334
335
@FunctionalInterface
336
interface Extractor<CONTAINER> extends Function<CONTAINER, Stream<?>> {}
337
```
338
339
**Usage Examples:**
340
341
```java
342
import reactor.test.ValueFormatters;
343
344
@Test
345
public void testCustomValueFormatting() {
346
// Custom formatter for Person objects
347
ToStringConverter personFormatter = ValueFormatters.forClass(
348
Person.class,
349
person -> String.format("Person{name='%s', age=%d}",
350
person.getName(), person.getAge())
351
);
352
353
// Custom formatter with filtering
354
ToStringConverter evenNumberFormatter = ValueFormatters.forClassMatching(
355
Integer.class,
356
n -> n % 2 == 0,
357
n -> "EVEN(" + n + ")"
358
);
359
360
// Use in StepVerifier options
361
StepVerifierOptions options = StepVerifierOptions.create()
362
.valueFormatter(personFormatter)
363
.valueFormatter(evenNumberFormatter);
364
365
StepVerifier.create(
366
Flux.just(new Person("Alice", 30), 42, new Person("Bob", 25), 17),
367
options
368
)
369
.expectNext(new Person("Alice", 30)) // Displayed as "Person{name='Alice', age=30}"
370
.expectNext(42) // Displayed as "EVEN(42)"
371
.expectNext(new Person("Bob", 25)) // Displayed as "Person{name='Bob', age=25}"
372
.expectNext(17) // Displayed as "17" (no special formatting)
373
.expectComplete()
374
.verify();
375
}
376
377
@Test
378
public void testExtractors() {
379
// Custom extractor for complex objects
380
Extractor<List<String>> listExtractor = list -> list.stream()
381
.map(s -> "Item: " + s);
382
383
StepVerifierOptions options = StepVerifierOptions.create()
384
.extractor(listExtractor);
385
386
// Use with StepVerifier for better error messages
387
List<String> testList = Arrays.asList("a", "b", "c");
388
389
StepVerifier.create(Flux.just(testList), options)
390
.expectNext(testList)
391
.expectComplete()
392
.verify();
393
}
394
```
395
396
## Integration Examples
397
398
### Comprehensive Testing Scenario
399
400
Combining multiple testing utilities for complex scenarios:
401
402
```java
403
@Test
404
public void testComplexReactiveWorkflow() {
405
TestLogger logger = new TestLogger();
406
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
407
408
try {
409
// Enable log capture
410
LoggerUtils.enableCaptureWith(logger);
411
412
// Create complex workflow with race conditions
413
AtomicInteger processedCount = new AtomicInteger(0);
414
PublisherProbe<String> fallbackProbe = PublisherProbe.empty();
415
416
Flux<String> workflow = Flux.interval(Duration.ofSeconds(1), vts)
417
.take(5)
418
.map(i -> "item-" + i)
419
.doOnNext(item -> {
420
// Simulate race condition
421
RaceTestUtils.race(
422
() -> processedCount.incrementAndGet(),
423
() -> logger.info("Processing: " + item)
424
);
425
})
426
.switchIfEmpty(fallbackProbe.flux())
427
.log();
428
429
TestSubscriber<String> subscriber = TestSubscriber.create();
430
workflow.subscribe(subscriber);
431
432
// Advance time to complete workflow
433
vts.advanceTimeBy(Duration.ofSeconds(6));
434
435
// Verify results
436
assert subscriber.isTerminatedComplete();
437
assert subscriber.getReceivedOnNext().size() == 5;
438
assert processedCount.get() == 5;
439
440
// Verify fallback was not used
441
fallbackProbe.assertWasNotSubscribed();
442
443
// Verify logging
444
String logOutput = logger.getOutContent();
445
assert logOutput.contains("Processing: item-0");
446
assert logOutput.contains("onComplete");
447
448
} finally {
449
LoggerUtils.disableCapture();
450
VirtualTimeScheduler.reset();
451
}
452
}
453
```