or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdpublisher-probe.mdstep-verifier.mdtest-publisher.mdtest-subscriber.mdtesting-utilities.mdvirtual-time.md

virtual-time.mddocs/

0

# Virtual Time Control

1

2

VirtualTimeScheduler enables time manipulation in reactive tests by replacing real time with a controllable virtual clock. This allows testing of time-based operations like delays, timeouts, and intervals without actual waiting.

3

4

## Capabilities

5

6

### Creating VirtualTimeSchedulers

7

8

Factory methods for creating VirtualTimeScheduler instances.

9

10

```java { .api }

11

class VirtualTimeScheduler implements Scheduler {

12

/** Create new VirtualTimeScheduler (not enabled globally) */

13

static VirtualTimeScheduler create();

14

15

/** Create with optional deferred time operations */

16

static VirtualTimeScheduler create(boolean defer);

17

}

18

```

19

20

**Usage Examples:**

21

22

```java

23

import reactor.test.scheduler.VirtualTimeScheduler;

24

25

// Create virtual time scheduler

26

VirtualTimeScheduler vts = VirtualTimeScheduler.create();

27

28

// Use with specific operations

29

Flux<String> delayed = Flux.just("hello")

30

.delayElements(Duration.ofHours(1), vts);

31

32

// Time doesn't advance automatically - you control it

33

vts.advanceTimeBy(Duration.ofHours(1));

34

```

35

36

### Global Scheduler Control

37

38

Static methods for managing the global default scheduler replacement.

39

40

```java { .api }

41

class VirtualTimeScheduler {

42

/** Get existing or create and set as default scheduler */

43

static VirtualTimeScheduler getOrSet();

44

45

/** Get or create with deferred option */

46

static VirtualTimeScheduler getOrSet(boolean defer);

47

48

/** Set specific scheduler if none exists */

49

static VirtualTimeScheduler getOrSet(VirtualTimeScheduler scheduler);

50

51

/** Force set scheduler as default */

52

static VirtualTimeScheduler set(VirtualTimeScheduler scheduler);

53

54

/** Get current VirtualTimeScheduler (throws if not set) */

55

static VirtualTimeScheduler get() throws IllegalStateException;

56

57

/** Check if VTS is currently enabled in Schedulers factory */

58

static boolean isFactoryEnabled();

59

60

/** Reset to original schedulers */

61

static void reset();

62

}

63

```

64

65

**Usage Examples:**

66

67

```java

68

// Enable virtual time globally

69

VirtualTimeScheduler vts = VirtualTimeScheduler.getOrSet();

70

71

// Now all time-based operations use virtual time

72

Flux<String> delayed = Flux.just("hello")

73

.delayElements(Duration.ofMinutes(30)); // Uses virtual time

74

75

// Control time advancement

76

vts.advanceTimeBy(Duration.ofMinutes(30));

77

78

// Reset when done

79

VirtualTimeScheduler.reset();

80

81

// Check if virtual time is enabled

82

if (VirtualTimeScheduler.isFactoryEnabled()) {

83

// Virtual time operations

84

} else {

85

// Real time operations

86

}

87

```

88

89

### Time Control Methods

90

91

Methods for manipulating virtual time progression.

92

93

```java { .api }

94

class VirtualTimeScheduler {

95

/** Trigger all pending tasks at current virtual time */

96

void advanceTime();

97

98

/** Advance virtual clock by the specified duration */

99

void advanceTimeBy(Duration delayTime);

100

101

/** Advance virtual clock to specific instant */

102

void advanceTimeTo(Instant targetTime);

103

}

104

```

105

106

**Usage Examples:**

107

108

```java

109

VirtualTimeScheduler vts = VirtualTimeScheduler.create();

110

111

// Schedule tasks at different times

112

vts.schedule(() -> System.out.println("Task 1"), 1, TimeUnit.HOURS);

113

vts.schedule(() -> System.out.println("Task 2"), 2, TimeUnit.HOURS);

114

vts.schedule(() -> System.out.println("Task 3"), 3, TimeUnit.HOURS);

115

116

// Initially no tasks execute

117

assert vts.getScheduledTaskCount() == 3;

118

119

// Advance by 1 hour - executes Task 1

120

vts.advanceTimeBy(Duration.ofHours(1));

121

assert vts.getScheduledTaskCount() == 2;

122

123

// Advance by another hour - executes Task 2

124

vts.advanceTimeBy(Duration.ofHours(1));

125

assert vts.getScheduledTaskCount() == 1;

126

127

// Advance to specific time - executes Task 3

128

vts.advanceTimeTo(Instant.now().plus(Duration.ofHours(3)));

129

assert vts.getScheduledTaskCount() == 0;

130

131

// Trigger any remaining tasks at current time

132

vts.advanceTime();

133

```

134

135

### Query Methods

136

137

Methods for inspecting scheduler state.

138

139

```java { .api }

140

class VirtualTimeScheduler {

141

/** Get number of currently scheduled tasks */

142

long getScheduledTaskCount();

143

}

144

```

145

146

### Scheduler Implementation

147

148

Standard Scheduler interface implementation for task scheduling.

149

150

```java { .api }

151

class VirtualTimeScheduler implements Scheduler {

152

/** Create new worker for this scheduler */

153

Worker createWorker();

154

155

/** Schedule immediate task */

156

Disposable schedule(Runnable task);

157

158

/** Schedule delayed task */

159

Disposable schedule(Runnable task, long delay, TimeUnit unit);

160

161

/** Schedule periodic task */

162

Disposable schedulePeriodically(

163

Runnable task,

164

long initialDelay,

165

long period,

166

TimeUnit unit

167

);

168

169

/** Get current virtual time */

170

long now(TimeUnit unit);

171

172

/** Check if scheduler is disposed */

173

boolean isDisposed();

174

175

/** Dispose scheduler and cancel all tasks */

176

void dispose();

177

}

178

```

179

180

## Integration with StepVerifier

181

182

VirtualTimeScheduler integrates seamlessly with StepVerifier for time-based testing:

183

184

```java

185

@Test

186

public void testDelayedSequence() {

187

StepVerifier.withVirtualTime(() ->

188

Flux.just("a", "b", "c")

189

.delayElements(Duration.ofMinutes(1))

190

)

191

.expectSubscription()

192

.expectNoEvent(Duration.ofMinutes(1)) // No events for 1 minute

193

.expectNext("a")

194

.expectNoEvent(Duration.ofMinutes(1)) // Wait another minute

195

.expectNext("b")

196

.expectNoEvent(Duration.ofMinutes(1)) // Wait another minute

197

.expectNext("c")

198

.expectComplete()

199

.verify();

200

}

201

202

@Test

203

public void testTimeout() {

204

StepVerifier.withVirtualTime(() ->

205

Flux.never().timeout(Duration.ofSeconds(5))

206

)

207

.expectSubscription()

208

.expectNoEvent(Duration.ofSeconds(5)) // Wait for timeout

209

.expectError(TimeoutException.class)

210

.verify();

211

}

212

213

@Test

214

public void testInterval() {

215

StepVerifier.withVirtualTime(() ->

216

Flux.interval(Duration.ofHours(1)).take(3)

217

)

218

.expectSubscription()

219

.expectNoEvent(Duration.ofHours(1))

220

.expectNext(0L)

221

.expectNoEvent(Duration.ofHours(1))

222

.expectNext(1L)

223

.expectNoEvent(Duration.ofHours(1))

224

.expectNext(2L)

225

.expectComplete()

226

.verify();

227

}

228

```

229

230

## Advanced Usage Patterns

231

232

### Manual Time Control

233

234

For complex time-based testing scenarios:

235

236

```java

237

@Test

238

public void testComplexTimeBasedBehavior() {

239

VirtualTimeScheduler vts = VirtualTimeScheduler.create();

240

241

// Create time-sensitive publisher

242

Flux<String> timedFlux = Flux.interval(Duration.ofMinutes(10), vts)

243

.map(i -> "Event " + i)

244

.take(5);

245

246

TestSubscriber<String> subscriber = TestSubscriber.create();

247

timedFlux.subscribe(subscriber);

248

249

// No events initially

250

assert subscriber.getReceivedOnNext().isEmpty();

251

252

// Advance time and check events

253

vts.advanceTimeBy(Duration.ofMinutes(10));

254

assert subscriber.getReceivedOnNext().size() == 1;

255

assert subscriber.getReceivedOnNext().get(0).equals("Event 0");

256

257

vts.advanceTimeBy(Duration.ofMinutes(30)); // Advance 3 more intervals

258

assert subscriber.getReceivedOnNext().size() == 4;

259

260

vts.advanceTimeBy(Duration.ofMinutes(10)); // Final interval

261

assert subscriber.getReceivedOnNext().size() == 5;

262

assert subscriber.isTerminatedComplete();

263

}

264

```

265

266

### Testing Race Conditions with Time

267

268

Combine with other testing utilities for race condition testing:

269

270

```java

271

@Test

272

public void testTimeBasedRaceCondition() {

273

VirtualTimeScheduler vts = VirtualTimeScheduler.getOrSet();

274

275

try {

276

AtomicInteger counter = new AtomicInteger(0);

277

278

// Schedule competing tasks

279

Flux.interval(Duration.ofMillis(100), vts)

280

.take(10)

281

.subscribe(i -> counter.incrementAndGet());

282

283

Flux.interval(Duration.ofMillis(150), vts)

284

.take(7)

285

.subscribe(i -> counter.addAndGet(2));

286

287

// Advance time to let all tasks complete

288

vts.advanceTimeBy(Duration.ofSeconds(2));

289

290

// Verify expected interactions

291

assert counter.get() == 10 + (7 * 2); // 10 from first, 14 from second

292

293

} finally {

294

VirtualTimeScheduler.reset();

295

}

296

}

297

```

298

299

### Testing Backpressure with Time

300

301

Test backpressure behavior in time-based scenarios:

302

303

```java

304

@Test

305

public void testBackpressureWithTime() {

306

VirtualTimeScheduler vts = VirtualTimeScheduler.create();

307

308

// Fast producer with backpressure

309

Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(10), vts)

310

.onBackpressureBuffer(5); // Small buffer

311

312

TestSubscriber<Long> slowSubscriber = TestSubscriber.builder()

313

.initialRequest(1) // Slow consumer

314

.build();

315

316

fastProducer.subscribe(slowSubscriber);

317

318

// Advance time quickly - should cause backpressure

319

vts.advanceTimeBy(Duration.ofMillis(100)); // Would generate 10 items

320

321

// But subscriber only requested 1

322

assert slowSubscriber.getReceivedOnNext().size() == 1;

323

324

// Request more

325

slowSubscriber.request(3);

326

assert slowSubscriber.getReceivedOnNext().size() == 4;

327

328

// Continue advancing time

329

vts.advanceTimeBy(Duration.ofMillis(50));

330

slowSubscriber.request(10);

331

332

// Verify buffer overflow handling

333

List<String> protocolErrors = slowSubscriber.getProtocolErrors();

334

// May contain backpressure-related errors if buffer overflowed

335

}

336

```

337

338

### Cleanup and Resource Management

339

340

Proper cleanup when using VirtualTimeScheduler:

341

342

```java

343

@Test

344

public void testWithProperCleanup() {

345

VirtualTimeScheduler vts = null;

346

347

try {

348

vts = VirtualTimeScheduler.getOrSet();

349

350

// Test time-based operations

351

Flux<String> delayed = Flux.just("test")

352

.delayElements(Duration.ofSeconds(1));

353

354

StepVerifier.create(delayed)

355

.then(() -> vts.advanceTimeBy(Duration.ofSeconds(1)))

356

.expectNext("test")

357

.expectComplete()

358

.verify();

359

360

} finally {

361

// Always reset to avoid affecting other tests

362

VirtualTimeScheduler.reset();

363

364

// Dispose if created manually

365

if (vts != null && !vts.isDisposed()) {

366

vts.dispose();

367

}

368

}

369

}

370

```