0
# Virtual Time Control
1
2
VirtualTimeScheduler enables time manipulation in reactive tests by replacing real time with a controllable virtual clock. This allows testing of time-based operations like delays, timeouts, and intervals without actual waiting.
3
4
## Capabilities
5
6
### Creating VirtualTimeSchedulers
7
8
Factory methods for creating VirtualTimeScheduler instances.
9
10
```java { .api }
11
class VirtualTimeScheduler implements Scheduler {
12
/** Create new VirtualTimeScheduler (not enabled globally) */
13
static VirtualTimeScheduler create();
14
15
/** Create with optional deferred time operations */
16
static VirtualTimeScheduler create(boolean defer);
17
}
18
```
19
20
**Usage Examples:**
21
22
```java
23
import reactor.test.scheduler.VirtualTimeScheduler;
24
25
// Create virtual time scheduler
26
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
27
28
// Use with specific operations
29
Flux<String> delayed = Flux.just("hello")
30
.delayElements(Duration.ofHours(1), vts);
31
32
// Time doesn't advance automatically - you control it
33
vts.advanceTimeBy(Duration.ofHours(1));
34
```
35
36
### Global Scheduler Control
37
38
Static methods for managing the global default scheduler replacement.
39
40
```java { .api }
41
class VirtualTimeScheduler {
42
/** Get existing or create and set as default scheduler */
43
static VirtualTimeScheduler getOrSet();
44
45
/** Get or create with deferred option */
46
static VirtualTimeScheduler getOrSet(boolean defer);
47
48
/** Set specific scheduler if none exists */
49
static VirtualTimeScheduler getOrSet(VirtualTimeScheduler scheduler);
50
51
/** Force set scheduler as default */
52
static VirtualTimeScheduler set(VirtualTimeScheduler scheduler);
53
54
/** Get current VirtualTimeScheduler (throws if not set) */
55
static VirtualTimeScheduler get() throws IllegalStateException;
56
57
/** Check if VTS is currently enabled in Schedulers factory */
58
static boolean isFactoryEnabled();
59
60
/** Reset to original schedulers */
61
static void reset();
62
}
63
```
64
65
**Usage Examples:**
66
67
```java
68
// Enable virtual time globally
69
VirtualTimeScheduler vts = VirtualTimeScheduler.getOrSet();
70
71
// Now all time-based operations use virtual time
72
Flux<String> delayed = Flux.just("hello")
73
.delayElements(Duration.ofMinutes(30)); // Uses virtual time
74
75
// Control time advancement
76
vts.advanceTimeBy(Duration.ofMinutes(30));
77
78
// Reset when done
79
VirtualTimeScheduler.reset();
80
81
// Check if virtual time is enabled
82
if (VirtualTimeScheduler.isFactoryEnabled()) {
83
// Virtual time operations
84
} else {
85
// Real time operations
86
}
87
```
88
89
### Time Control Methods
90
91
Methods for manipulating virtual time progression.
92
93
```java { .api }
94
class VirtualTimeScheduler {
95
/** Trigger all pending tasks at current virtual time */
96
void advanceTime();
97
98
/** Advance virtual clock by the specified duration */
99
void advanceTimeBy(Duration delayTime);
100
101
/** Advance virtual clock to specific instant */
102
void advanceTimeTo(Instant targetTime);
103
}
104
```
105
106
**Usage Examples:**
107
108
```java
109
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
110
111
// Schedule tasks at different times
112
vts.schedule(() -> System.out.println("Task 1"), 1, TimeUnit.HOURS);
113
vts.schedule(() -> System.out.println("Task 2"), 2, TimeUnit.HOURS);
114
vts.schedule(() -> System.out.println("Task 3"), 3, TimeUnit.HOURS);
115
116
// Initially no tasks execute
117
assert vts.getScheduledTaskCount() == 3;
118
119
// Advance by 1 hour - executes Task 1
120
vts.advanceTimeBy(Duration.ofHours(1));
121
assert vts.getScheduledTaskCount() == 2;
122
123
// Advance by another hour - executes Task 2
124
vts.advanceTimeBy(Duration.ofHours(1));
125
assert vts.getScheduledTaskCount() == 1;
126
127
// Advance to specific time - executes Task 3
128
vts.advanceTimeTo(Instant.now().plus(Duration.ofHours(3)));
129
assert vts.getScheduledTaskCount() == 0;
130
131
// Trigger any remaining tasks at current time
132
vts.advanceTime();
133
```
134
135
### Query Methods
136
137
Methods for inspecting scheduler state.
138
139
```java { .api }
140
class VirtualTimeScheduler {
141
/** Get number of currently scheduled tasks */
142
long getScheduledTaskCount();
143
}
144
```
145
146
### Scheduler Implementation
147
148
Standard Scheduler interface implementation for task scheduling.
149
150
```java { .api }
151
class VirtualTimeScheduler implements Scheduler {
152
/** Create new worker for this scheduler */
153
Worker createWorker();
154
155
/** Schedule immediate task */
156
Disposable schedule(Runnable task);
157
158
/** Schedule delayed task */
159
Disposable schedule(Runnable task, long delay, TimeUnit unit);
160
161
/** Schedule periodic task */
162
Disposable schedulePeriodically(
163
Runnable task,
164
long initialDelay,
165
long period,
166
TimeUnit unit
167
);
168
169
/** Get current virtual time */
170
long now(TimeUnit unit);
171
172
/** Check if scheduler is disposed */
173
boolean isDisposed();
174
175
/** Dispose scheduler and cancel all tasks */
176
void dispose();
177
}
178
```
179
180
## Integration with StepVerifier
181
182
VirtualTimeScheduler integrates seamlessly with StepVerifier for time-based testing:
183
184
```java
185
@Test
186
public void testDelayedSequence() {
187
StepVerifier.withVirtualTime(() ->
188
Flux.just("a", "b", "c")
189
.delayElements(Duration.ofMinutes(1))
190
)
191
.expectSubscription()
192
.expectNoEvent(Duration.ofMinutes(1)) // No events for 1 minute
193
.expectNext("a")
194
.expectNoEvent(Duration.ofMinutes(1)) // Wait another minute
195
.expectNext("b")
196
.expectNoEvent(Duration.ofMinutes(1)) // Wait another minute
197
.expectNext("c")
198
.expectComplete()
199
.verify();
200
}
201
202
@Test
203
public void testTimeout() {
204
StepVerifier.withVirtualTime(() ->
205
Flux.never().timeout(Duration.ofSeconds(5))
206
)
207
.expectSubscription()
208
.expectNoEvent(Duration.ofSeconds(5)) // Wait for timeout
209
.expectError(TimeoutException.class)
210
.verify();
211
}
212
213
@Test
214
public void testInterval() {
215
StepVerifier.withVirtualTime(() ->
216
Flux.interval(Duration.ofHours(1)).take(3)
217
)
218
.expectSubscription()
219
.expectNoEvent(Duration.ofHours(1))
220
.expectNext(0L)
221
.expectNoEvent(Duration.ofHours(1))
222
.expectNext(1L)
223
.expectNoEvent(Duration.ofHours(1))
224
.expectNext(2L)
225
.expectComplete()
226
.verify();
227
}
228
```
229
230
## Advanced Usage Patterns
231
232
### Manual Time Control
233
234
For complex time-based testing scenarios:
235
236
```java
237
@Test
238
public void testComplexTimeBasedBehavior() {
239
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
240
241
// Create time-sensitive publisher
242
Flux<String> timedFlux = Flux.interval(Duration.ofMinutes(10), vts)
243
.map(i -> "Event " + i)
244
.take(5);
245
246
TestSubscriber<String> subscriber = TestSubscriber.create();
247
timedFlux.subscribe(subscriber);
248
249
// No events initially
250
assert subscriber.getReceivedOnNext().isEmpty();
251
252
// Advance time and check events
253
vts.advanceTimeBy(Duration.ofMinutes(10));
254
assert subscriber.getReceivedOnNext().size() == 1;
255
assert subscriber.getReceivedOnNext().get(0).equals("Event 0");
256
257
vts.advanceTimeBy(Duration.ofMinutes(30)); // Advance 3 more intervals
258
assert subscriber.getReceivedOnNext().size() == 4;
259
260
vts.advanceTimeBy(Duration.ofMinutes(10)); // Final interval
261
assert subscriber.getReceivedOnNext().size() == 5;
262
assert subscriber.isTerminatedComplete();
263
}
264
```
265
266
### Testing Race Conditions with Time
267
268
Combine with other testing utilities for race condition testing:
269
270
```java
271
@Test
272
public void testTimeBasedRaceCondition() {
273
VirtualTimeScheduler vts = VirtualTimeScheduler.getOrSet();
274
275
try {
276
AtomicInteger counter = new AtomicInteger(0);
277
278
// Schedule competing tasks
279
Flux.interval(Duration.ofMillis(100), vts)
280
.take(10)
281
.subscribe(i -> counter.incrementAndGet());
282
283
Flux.interval(Duration.ofMillis(150), vts)
284
.take(7)
285
.subscribe(i -> counter.addAndGet(2));
286
287
// Advance time to let all tasks complete
288
vts.advanceTimeBy(Duration.ofSeconds(2));
289
290
// Verify expected interactions
291
assert counter.get() == 10 + (7 * 2); // 10 from first, 14 from second
292
293
} finally {
294
VirtualTimeScheduler.reset();
295
}
296
}
297
```
298
299
### Testing Backpressure with Time
300
301
Test backpressure behavior in time-based scenarios:
302
303
```java
304
@Test
305
public void testBackpressureWithTime() {
306
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
307
308
// Fast producer with backpressure
309
Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(10), vts)
310
.onBackpressureBuffer(5); // Small buffer
311
312
TestSubscriber<Long> slowSubscriber = TestSubscriber.builder()
313
.initialRequest(1) // Slow consumer
314
.build();
315
316
fastProducer.subscribe(slowSubscriber);
317
318
// Advance time quickly - should cause backpressure
319
vts.advanceTimeBy(Duration.ofMillis(100)); // Would generate 10 items
320
321
// But subscriber only requested 1
322
assert slowSubscriber.getReceivedOnNext().size() == 1;
323
324
// Request more
325
slowSubscriber.request(3);
326
assert slowSubscriber.getReceivedOnNext().size() == 4;
327
328
// Continue advancing time
329
vts.advanceTimeBy(Duration.ofMillis(50));
330
slowSubscriber.request(10);
331
332
// Verify buffer overflow handling
333
List<String> protocolErrors = slowSubscriber.getProtocolErrors();
334
// May contain backpressure-related errors if buffer overflowed
335
}
336
```
337
338
### Cleanup and Resource Management
339
340
Proper cleanup when using VirtualTimeScheduler:
341
342
```java
343
@Test
344
public void testWithProperCleanup() {
345
VirtualTimeScheduler vts = null;
346
347
try {
348
vts = VirtualTimeScheduler.getOrSet();
349
350
// Test time-based operations
351
Flux<String> delayed = Flux.just("test")
352
.delayElements(Duration.ofSeconds(1));
353
354
StepVerifier.create(delayed)
355
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(1)))
356
.expectNext("test")
357
.expectComplete()
358
.verify();
359
360
} finally {
361
// Always reset to avoid affecting other tests
362
VirtualTimeScheduler.reset();
363
364
// Dispose if created manually
365
if (vts != null && !vts.isDisposed()) {
366
vts.dispose();
367
}
368
}
369
}
370
```