or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdpublisher-probe.mdstep-verifier.mdtest-publisher.mdtest-subscriber.mdtesting-utilities.mdvirtual-time.md

publisher-probe.mddocs/

0

# Publisher Instrumentation

1

2

PublisherProbe provides instrumentation for Publishers to capture subscription events and verify control flow without affecting the actual data flow. It's ideal for testing conditional logic and verifying that certain code paths are executed.

3

4

## Capabilities

5

6

### Creating PublisherProbes

7

8

Factory methods for creating PublisherProbe instances.

9

10

```java { .api }

11

interface PublisherProbe<T> {

12

/** Create probe wrapping existing publisher */

13

static <T> PublisherProbe<T> of(Publisher<? extends T> source);

14

15

/** Create probe that simply completes (empty source) */

16

static <T> PublisherProbe<T> empty();

17

}

18

```

19

20

**Usage Examples:**

21

22

```java

23

import reactor.test.publisher.PublisherProbe;

24

25

// Probe an existing publisher

26

Flux<String> source = Flux.just("hello", "world");

27

PublisherProbe<String> probe = PublisherProbe.of(source);

28

29

// Use probe in place of original publisher

30

probe.flux().subscribe(System.out::println);

31

32

// Verify subscription occurred

33

assert probe.wasSubscribed();

34

35

// Create empty probe for conditional paths

36

PublisherProbe<String> emptyProbe = PublisherProbe.empty();

37

// Use in conditional logic where you want to verify execution path

38

```

39

40

### Conversion Methods

41

42

Convert PublisherProbe to standard Reactor types.

43

44

```java { .api }

45

interface PublisherProbe<T> {

46

/** Get Mono version of probe */

47

Mono<T> mono();

48

49

/** Get Flux version of probe */

50

Flux<T> flux();

51

}

52

```

53

54

### State Query Methods

55

56

Methods for querying subscription and interaction state.

57

58

```java { .api }

59

interface PublisherProbe<T> {

60

/** Check if probe was subscribed to */

61

boolean wasSubscribed();

62

63

/** Get total number of subscriptions */

64

long subscribeCount();

65

66

/** Check if probe was cancelled */

67

boolean wasCancelled();

68

69

/** Check if probe received any requests */

70

boolean wasRequested();

71

}

72

```

73

74

**Usage Examples:**

75

76

```java

77

PublisherProbe<String> probe = PublisherProbe.of(Flux.just("test"));

78

79

// Before subscription

80

assert !probe.wasSubscribed();

81

assert probe.subscribeCount() == 0;

82

assert !probe.wasCancelled();

83

assert !probe.wasRequested();

84

85

// Subscribe and check state

86

Disposable subscription = probe.flux().subscribe();

87

88

assert probe.wasSubscribed();

89

assert probe.subscribeCount() == 1;

90

assert !probe.wasCancelled();

91

assert probe.wasRequested();

92

93

// Cancel and check state

94

subscription.dispose();

95

assert probe.wasCancelled();

96

97

// Multiple subscriptions

98

probe.flux().subscribe();

99

assert probe.subscribeCount() == 2; // Now has 2 total subscriptions

100

```

101

102

### Assertion Methods

103

104

Assertion methods that throw exceptions on failure for test integration.

105

106

```java { .api }

107

interface PublisherProbe<T> {

108

/** Assert probe was subscribed (throws if not) */

109

void assertWasSubscribed();

110

111

/** Assert probe was not subscribed (throws if was) */

112

void assertWasNotSubscribed();

113

114

/** Assert probe was cancelled (throws if not) */

115

void assertWasCancelled();

116

117

/** Assert probe was not cancelled (throws if was) */

118

void assertWasNotCancelled();

119

120

/** Assert probe received requests (throws if not) */

121

void assertWasRequested();

122

123

/** Assert probe received no requests (throws if did) */

124

void assertWasNotRequested();

125

}

126

```

127

128

**Usage Examples:**

129

130

```java

131

PublisherProbe<String> probe = PublisherProbe.empty();

132

133

// Test conditional execution paths

134

if (someCondition) {

135

probe.flux().subscribe(); // Conditional subscription

136

}

137

138

// Assert the condition was met

139

probe.assertWasSubscribed(); // Throws AssertionError if condition was false

140

141

// Test error handling paths

142

try {

143

probe.flux()

144

.map(s -> { throw new RuntimeException(); })

145

.subscribe();

146

} catch (Exception e) {

147

// Handle error

148

}

149

150

probe.assertWasSubscribed(); // Verify subscription occurred despite error

151

```

152

153

## Common Use Cases

154

155

### Testing Conditional Logic

156

157

PublisherProbe is excellent for verifying that conditional code paths are executed:

158

159

```java

160

@Test

161

public void testConditionalExecution() {

162

PublisherProbe<String> thenProbe = PublisherProbe.empty();

163

PublisherProbe<String> elseProbe = PublisherProbe.empty();

164

165

boolean condition = true;

166

167

Flux<String> result = condition

168

? thenProbe.flux()

169

: elseProbe.flux();

170

171

StepVerifier.create(result)

172

.expectComplete()

173

.verify();

174

175

// Verify correct branch was taken

176

thenProbe.assertWasSubscribed();

177

elseProbe.assertWasNotSubscribed();

178

}

179

```

180

181

### Testing switchIfEmpty Behavior

182

183

Verify that fallback publishers are used when main publishers are empty:

184

185

```java

186

@Test

187

public void testSwitchIfEmpty() {

188

Flux<String> empty = Flux.empty();

189

PublisherProbe<String> fallback = PublisherProbe.of(Flux.just("fallback"));

190

191

StepVerifier.create(

192

empty.switchIfEmpty(fallback.flux())

193

)

194

.expectNext("fallback")

195

.expectComplete()

196

.verify();

197

198

fallback.assertWasSubscribed(); // Verify fallback was used

199

}

200

201

@Test

202

public void testSwitchIfEmptyNotUsed() {

203

Flux<String> nonEmpty = Flux.just("primary");

204

PublisherProbe<String> fallback = PublisherProbe.of(Flux.just("fallback"));

205

206

StepVerifier.create(

207

nonEmpty.switchIfEmpty(fallback.flux())

208

)

209

.expectNext("primary")

210

.expectComplete()

211

.verify();

212

213

fallback.assertWasNotSubscribed(); // Verify fallback was not used

214

}

215

```

216

217

### Testing Error Recovery

218

219

Verify that error recovery publishers are triggered:

220

221

```java

222

@Test

223

public void testErrorRecovery() {

224

Flux<String> failing = Flux.error(new RuntimeException("Error"));

225

PublisherProbe<String> recovery = PublisherProbe.of(Flux.just("recovered"));

226

227

StepVerifier.create(

228

failing.onErrorResume(ex -> recovery.flux())

229

)

230

.expectNext("recovered")

231

.expectComplete()

232

.verify();

233

234

recovery.assertWasSubscribed(); // Verify recovery was triggered

235

}

236

```

237

238

### Testing Complex Control Flow

239

240

Verify multiple conditional paths in complex reactive chains:

241

242

```java

243

@Test

244

public void testComplexControlFlow() {

245

PublisherProbe<String> cacheHit = PublisherProbe.of(Flux.just("cached"));

246

PublisherProbe<String> networkCall = PublisherProbe.of(Flux.just("network"));

247

PublisherProbe<String> fallback = PublisherProbe.of(Flux.just("fallback"));

248

249

boolean cacheAvailable = false;

250

boolean networkAvailable = true;

251

252

Flux<String> result = (cacheAvailable ? cacheHit.flux() : Flux.empty())

253

.switchIfEmpty(networkAvailable ? networkCall.flux() : fallback.flux());

254

255

StepVerifier.create(result)

256

.expectNext("network")

257

.expectComplete()

258

.verify();

259

260

// Verify execution path

261

cacheHit.assertWasNotSubscribed(); // Cache was not available

262

networkCall.assertWasSubscribed(); // Network was used

263

fallback.assertWasNotSubscribed(); // Fallback was not needed

264

}

265

```

266

267

### Integration with Other Testing Utilities

268

269

PublisherProbe works well with other reactor-test utilities:

270

271

```java

272

@Test

273

public void testWithStepVerifier() {

274

PublisherProbe<String> probe = PublisherProbe.of(

275

Flux.just("a", "b", "c").delayElements(Duration.ofMillis(100))

276

);

277

278

StepVerifier.create(probe.flux())

279

.expectNext("a", "b", "c")

280

.expectComplete()

281

.verify();

282

283

// Verify subscription behavior

284

probe.assertWasSubscribed();

285

probe.assertWasRequested();

286

assert probe.subscribeCount() == 1;

287

}

288

289

@Test

290

public void testWithTestPublisher() {

291

TestPublisher<String> testPub = TestPublisher.create();

292

PublisherProbe<String> probe = PublisherProbe.of(testPub.flux());

293

294

StepVerifier.create(probe.flux())

295

.then(() -> testPub.next("test").complete())

296

.expectNext("test")

297

.expectComplete()

298

.verify();

299

300

probe.assertWasSubscribed();

301

testPub.assertSubscribers(1);

302

}

303

```