or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdpublisher-probe.mdstep-verifier.mdtest-publisher.mdtest-subscriber.mdtesting-utilities.mdvirtual-time.md

test-subscriber.mddocs/

0

# Manual Subscribers

1

2

TestSubscriber provides a CoreSubscriber implementation for runtime event assertion, offering an alternative to StepVerifier for complex testing scenarios requiring programmatic access to received signals and flexible assertion patterns.

3

4

## Capabilities

5

6

### Creating TestSubscribers

7

8

Factory methods and builder for creating TestSubscriber instances.

9

10

```java { .api }

11

interface TestSubscriber<T> extends CoreSubscriber<T>, Scannable {

12

/** Create simple TestSubscriber with unbounded demand */

13

static <T> TestSubscriber<T> create();

14

15

/** Create TestSubscriberBuilder for customization */

16

static TestSubscriberBuilder builder();

17

}

18

19

class TestSubscriberBuilder {

20

/** Add context entry */

21

TestSubscriberBuilder contextPut(Object key, Object value);

22

23

/** Add multiple context entries */

24

TestSubscriberBuilder contextPutAll(ContextView toAdd);

25

26

/** Set initial request amount */

27

TestSubscriberBuilder initialRequest(long initialRequest);

28

29

/** Set unbounded initial request */

30

TestSubscriberBuilder initialRequestUnbounded();

31

32

/** Require specific fusion mode */

33

TestSubscriberBuilder requireFusion(int exactMode);

34

35

/** Require fusion negotiation result */

36

TestSubscriberBuilder requireFusion(int requestedMode, int negotiatedMode);

37

38

/** Require non-fuseable subscription */

39

TestSubscriberBuilder requireNotFuseable();

40

41

/** Build standard TestSubscriber */

42

<T> TestSubscriber<T> build();

43

44

/** Build ConditionalTestSubscriber */

45

<T> ConditionalTestSubscriber<T> buildConditional(Predicate<? super T> tryOnNext);

46

}

47

```

48

49

**Usage Examples:**

50

51

```java

52

import reactor.test.subscriber.TestSubscriber;

53

54

// Simple subscriber with unbounded demand

55

TestSubscriber<String> subscriber = TestSubscriber.create();

56

Flux.just("hello", "world").subscribe(subscriber);

57

58

// Custom subscriber with limited initial request

59

TestSubscriber<Integer> limitedSubscriber = TestSubscriber.builder()

60

.initialRequest(5)

61

.build();

62

63

Flux.range(1, 100).subscribe(limitedSubscriber);

64

65

// Subscriber with context

66

TestSubscriber<String> contextSubscriber = TestSubscriber.builder()

67

.contextPut("userId", "12345")

68

.contextPut("requestId", "abc-def")

69

.build();

70

71

// Subscriber with fusion requirements

72

TestSubscriber<String> fusionSubscriber = TestSubscriber.builder()

73

.requireFusion(Fuseable.SYNC) // Require synchronous fusion

74

.build();

75

```

76

77

### Control Methods

78

79

Methods for controlling subscription behavior and backpressure.

80

81

```java { .api }

82

interface TestSubscriber<T> {

83

/** Cancel subscription and unblock pending block() calls */

84

void cancel();

85

86

/** Request additional elements from publisher */

87

void request(long additionalRequest);

88

}

89

```

90

91

**Usage Examples:**

92

93

```java

94

TestSubscriber<Integer> subscriber = TestSubscriber.builder()

95

.initialRequest(0) // Start with no demand

96

.build();

97

98

Flux.range(1, 10).subscribe(subscriber);

99

100

// Control backpressure manually

101

subscriber.request(3); // Request first 3 elements

102

// Process received elements...

103

subscriber.request(5); // Request 5 more

104

// Process more elements...

105

subscriber.cancel(); // Cancel remaining

106

```

107

108

### State Query Methods

109

110

Methods for checking subscriber state and termination status.

111

112

```java { .api }

113

interface TestSubscriber<T> {

114

/** Check if subscriber reached any end state (terminated or cancelled) */

115

boolean isTerminatedOrCancelled();

116

117

/** Check if subscriber received terminal signal (onComplete or onError) */

118

boolean isTerminated();

119

120

/** Check if subscriber received onComplete */

121

boolean isTerminatedComplete();

122

123

/** Check if subscriber received onError */

124

boolean isTerminatedError();

125

126

/** Check if subscriber was cancelled */

127

boolean isCancelled();

128

}

129

```

130

131

**Usage Examples:**

132

133

```java

134

TestSubscriber<String> subscriber = TestSubscriber.create();

135

136

// Before subscription

137

assert !subscriber.isTerminated();

138

assert !subscriber.isCancelled();

139

140

// Subscribe to completing publisher

141

Flux.just("hello").subscribe(subscriber);

142

143

// After completion

144

assert subscriber.isTerminated();

145

assert subscriber.isTerminatedComplete();

146

assert !subscriber.isTerminatedError();

147

assert !subscriber.isCancelled();

148

149

// Test error case

150

TestSubscriber<String> errorSubscriber = TestSubscriber.create();

151

Flux.<String>error(new RuntimeException()).subscribe(errorSubscriber);

152

153

assert errorSubscriber.isTerminated();

154

assert !errorSubscriber.isTerminatedComplete();

155

assert errorSubscriber.isTerminatedError();

156

```

157

158

### Data Access Methods

159

160

Methods for accessing received signals and protocol violations.

161

162

```java { .api }

163

interface TestSubscriber<T> {

164

/** Get terminal signal if available (nullable) */

165

@Nullable

166

Signal<T> getTerminalSignal();

167

168

/** Assert terminated and get terminal signal (throws if not terminated) */

169

Signal<T> expectTerminalSignal();

170

171

/** Assert error terminated and get error (throws if not error) */

172

Throwable expectTerminalError();

173

174

/** Get all received onNext values */

175

List<T> getReceivedOnNext();

176

177

/** Get onNext values received after cancellation (protocol violations) */

178

List<T> getReceivedOnNextAfterCancellation();

179

180

/** Get detected protocol violations from publisher */

181

List<String> getProtocolErrors();

182

183

/** Get negotiated fusion mode */

184

int getFusionMode();

185

}

186

```

187

188

**Usage Examples:**

189

190

```java

191

TestSubscriber<Integer> subscriber = TestSubscriber.create();

192

Flux.range(1, 5).subscribe(subscriber);

193

194

// Access received data

195

List<Integer> values = subscriber.getReceivedOnNext();

196

assert values.equals(Arrays.asList(1, 2, 3, 4, 5));

197

198

// Check terminal signal

199

Signal<Integer> terminal = subscriber.expectTerminalSignal();

200

assert terminal.isOnComplete();

201

202

// Test error case

203

TestSubscriber<String> errorSubscriber = TestSubscriber.create();

204

RuntimeException error = new RuntimeException("Test error");

205

Flux.<String>error(error).subscribe(errorSubscriber);

206

207

Throwable receivedError = errorSubscriber.expectTerminalError();

208

assert receivedError == error;

209

210

// Test protocol violations

211

TestSubscriber<String> violationSubscriber = TestSubscriber.create();

212

// ... publisher that violates protocol ...

213

List<String> violations = violationSubscriber.getProtocolErrors();

214

assert !violations.isEmpty();

215

```

216

217

### Blocking Methods

218

219

Methods for blocking until subscriber reaches terminal state.

220

221

```java { .api }

222

interface TestSubscriber<T> {

223

/** Block until end state reached (terminated or cancelled) */

224

void block();

225

226

/** Block until end state or timeout */

227

void block(Duration timeout);

228

}

229

```

230

231

**Usage Examples:**

232

233

```java

234

TestSubscriber<String> subscriber = TestSubscriber.create();

235

236

// Subscribe to async publisher

237

Flux.just("hello")

238

.delayElements(Duration.ofMillis(100))

239

.subscribe(subscriber);

240

241

// Block until completion

242

subscriber.block(); // Waits for async completion

243

assert subscriber.isTerminatedComplete();

244

245

// Block with timeout

246

TestSubscriber<String> timeoutSubscriber = TestSubscriber.create();

247

Flux.never().subscribe(timeoutSubscriber); // Publisher that never emits

248

249

try {

250

timeoutSubscriber.block(Duration.ofMillis(50));

251

assert false; // Should not reach here

252

} catch (IllegalStateException e) {

253

// Expected timeout

254

assert e.getMessage().contains("timeout");

255

}

256

```

257

258

### ConditionalTestSubscriber

259

260

Specialized subscriber for conditional subscription scenarios.

261

262

```java { .api }

263

interface ConditionalTestSubscriber<T> extends TestSubscriber<T>, Fuseable.ConditionalSubscriber<T> {

264

// Inherits all TestSubscriber methods plus:

265

266

/** Try to consume next value (from ConditionalSubscriber) */

267

boolean tryOnNext(T value);

268

}

269

```

270

271

**Usage Examples:**

272

273

```java

274

// Create conditional subscriber that only accepts even numbers

275

ConditionalTestSubscriber<Integer> conditionalSub = TestSubscriber.builder()

276

.buildConditional(n -> n % 2 == 0);

277

278

// Subscribe to publisher

279

Flux.range(1, 10).subscribe(conditionalSub);

280

281

// Only even numbers are received

282

List<Integer> received = conditionalSub.getReceivedOnNext();

283

assert received.equals(Arrays.asList(2, 4, 6, 8, 10));

284

```

285

286

## Types

287

288

```java { .api }

289

// Fusion capability expectations for TestSubscriber

290

enum TestSubscriber.FusionRequirement {

291

/** Expect publisher to be fuseable */

292

FUSEABLE,

293

294

/** Expect publisher to not be fuseable */

295

NOT_FUSEABLE,

296

297

/** No fusion requirements */

298

NONE

299

}

300

301

// Fusion modes (from Reactor Fuseable interface)

302

interface Fuseable {

303

int NONE = 0; // No fusion support

304

int SYNC = 1; // Synchronous fusion

305

int ASYNC = 2; // Asynchronous fusion

306

int ANY = 3; // Any fusion mode

307

int THREAD_BARRIER = 4; // Thread barrier

308

}

309

```

310

311

## Advanced Usage Patterns

312

313

### Complex Assertion Scenarios

314

315

TestSubscriber is ideal for scenarios requiring complex assertions that don't fit StepVerifier's declarative model:

316

317

```java

318

@Test

319

public void testComplexDataValidation() {

320

TestSubscriber<DataEvent> subscriber = TestSubscriber.create();

321

322

// Subscribe to complex data stream

323

complexDataService.getEvents().subscribe(subscriber);

324

325

// Block until completion

326

subscriber.block(Duration.ofSeconds(5));

327

328

// Perform complex validations

329

List<DataEvent> events = subscriber.getReceivedOnNext();

330

331

// Validate event ordering

332

for (int i = 1; i < events.size(); i++) {

333

assert events.get(i).getTimestamp() >= events.get(i-1).getTimestamp();

334

}

335

336

// Validate event types

337

Map<EventType, Long> typeCounts = events.stream()

338

.collect(Collectors.groupingBy(DataEvent::getType, Collectors.counting()));

339

340

assert typeCounts.get(EventType.START) == 1;

341

assert typeCounts.get(EventType.END) == 1;

342

assert typeCounts.get(EventType.DATA) >= 1;

343

}

344

```

345

346

### Manual Backpressure Testing

347

348

Test backpressure behavior by controlling requests manually:

349

350

```java

351

@Test

352

public void testBackpressureBehavior() {

353

TestSubscriber<Integer> subscriber = TestSubscriber.builder()

354

.initialRequest(0) // No initial demand

355

.build();

356

357

// Subscribe to fast producer

358

Flux.range(1, 1000)

359

.onBackpressureBuffer(10) // Limited buffer

360

.subscribe(subscriber);

361

362

// Verify no data received initially

363

assert subscriber.getReceivedOnNext().isEmpty();

364

365

// Request small batch

366

subscriber.request(5);

367

// Verify exactly 5 received

368

assert subscriber.getReceivedOnNext().size() == 5;

369

370

// Request more

371

subscriber.request(3);

372

assert subscriber.getReceivedOnNext().size() == 8;

373

374

// Cancel to test cleanup

375

subscriber.cancel();

376

assert subscriber.isCancelled();

377

}

378

```

379

380

### Protocol Violation Detection

381

382

Detect and assert on reactive streams protocol violations:

383

384

```java

385

@Test

386

public void testProtocolViolations() {

387

TestSubscriber<String> subscriber = TestSubscriber.create();

388

389

// Create misbehaving publisher

390

Publisher<String> badPublisher = s -> {

391

s.onSubscribe(new Subscription() {

392

@Override

393

public void request(long n) {}

394

395

@Override

396

public void cancel() {}

397

});

398

399

s.onNext("value");

400

s.onComplete();

401

s.onNext("after complete"); // Protocol violation!

402

};

403

404

badPublisher.subscribe(subscriber);

405

406

// Check for protocol violations

407

List<String> violations = subscriber.getProtocolErrors();

408

assert !violations.isEmpty();

409

assert violations.stream()

410

.anyMatch(v -> v.contains("onNext after terminal"));

411

412

// Check that violation was captured

413

List<String> afterCancel = subscriber.getReceivedOnNextAfterCancellation();

414

assert afterCancel.contains("after complete");

415

}

416

```