or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdpublisher-probe.mdstep-verifier.mdtest-publisher.mdtest-subscriber.mdtesting-utilities.mdvirtual-time.md

test-publisher.mddocs/

0

# Controlled Publishers

1

2

TestPublisher provides a manipulable Publisher implementation for controlled signal emission in tests. It allows complete control over when and what signals are emitted, making it ideal for testing subscriber behavior and backpressure scenarios.

3

4

## Capabilities

5

6

### Creating TestPublishers

7

8

Factory methods for creating different types of TestPublisher instances.

9

10

```java { .api }

11

abstract class TestPublisher<T> implements Publisher<T>, PublisherProbe<T> {

12

/** Create standard hot TestPublisher */

13

static <T> TestPublisher<T> create();

14

15

/** Create spec-violating TestPublisher with specified violations */

16

static <T> TestPublisher<T> createNoncompliant(Violation first, Violation... rest);

17

18

/** Create cold TestPublisher with buffering support for late subscribers */

19

static <T> TestPublisher<T> createCold();

20

21

/** Create cold TestPublisher that errors on overflow instead of buffering */

22

static <T> TestPublisher<T> createColdNonBuffering();

23

24

/** Create cold non-compliant TestPublisher with custom behavior */

25

static <T> TestPublisher<T> createColdNonCompliant(

26

boolean errorOnOverflow,

27

Violation firstViolation,

28

Violation... otherViolations

29

);

30

}

31

```

32

33

**Usage Examples:**

34

35

```java

36

import reactor.test.publisher.TestPublisher;

37

38

// Hot publisher - signals sent to current subscribers only

39

TestPublisher<String> hotPublisher = TestPublisher.create();

40

hotPublisher.next("early"); // Lost if no subscribers yet

41

42

Flux<String> flux = hotPublisher.flux();

43

flux.subscribe(System.out::println);

44

hotPublisher.next("hello"); // Delivered to subscriber

45

hotPublisher.complete();

46

47

// Cold publisher - buffers signals for late subscribers

48

TestPublisher<Integer> coldPublisher = TestPublisher.createCold();

49

coldPublisher.next(1, 2, 3); // Buffered

50

coldPublisher.complete();

51

52

// Late subscriber receives all buffered signals

53

coldPublisher.flux().subscribe(System.out::println); // Prints: 1, 2, 3

54

55

// Non-compliant publisher for testing error handling

56

TestPublisher<String> badPublisher = TestPublisher.createNoncompliant(

57

TestPublisher.Violation.ALLOW_NULL

58

);

59

badPublisher.next(null); // Normally illegal, but allowed

60

```

61

62

### Conversion Methods

63

64

Convert TestPublisher to standard Reactor types.

65

66

```java { .api }

67

abstract class TestPublisher<T> {

68

/** Wrap as Flux */

69

Flux<T> flux();

70

71

/** Wrap as Mono (will error if more than one element) */

72

Mono<T> mono();

73

}

74

```

75

76

### Signal Emission

77

78

Methods for manually emitting reactive signals.

79

80

```java { .api }

81

abstract class TestPublisher<T> {

82

/** Emit single onNext signal */

83

TestPublisher<T> next(@Nullable T value);

84

85

/** Emit multiple onNext signals */

86

TestPublisher<T> next(T first, T... rest);

87

88

/** Emit values and complete immediately */

89

TestPublisher<T> emit(T... values);

90

91

/** Trigger error signal */

92

TestPublisher<T> error(Throwable t);

93

94

/** Trigger completion signal */

95

TestPublisher<T> complete();

96

}

97

```

98

99

**Usage Examples:**

100

101

```java

102

TestPublisher<String> publisher = TestPublisher.create();

103

104

// Emit signals one by one

105

publisher.next("first")

106

.next("second")

107

.complete();

108

109

// Emit multiple values at once

110

publisher.next("hello", "world", "!");

111

112

// Emit values and complete in one call

113

TestPublisher.create()

114

.emit("alpha", "beta", "gamma"); // Automatically completes

115

116

// Error scenarios

117

publisher.error(new RuntimeException("Something went wrong"));

118

```

119

120

### Subscription Assertions

121

122

Methods for asserting subscriber behavior and request patterns.

123

124

```java { .api }

125

abstract class TestPublisher<T> {

126

/** Assert minimum request amount received */

127

TestPublisher<T> assertMinRequested(long n);

128

129

/** Assert maximum request amount received */

130

TestPublisher<T> assertMaxRequested(long n);

131

132

/** Assert has at least one subscriber */

133

TestPublisher<T> assertSubscribers();

134

135

/** Assert specific number of subscribers */

136

TestPublisher<T> assertSubscribers(int n);

137

138

/** Assert no subscribers */

139

TestPublisher<T> assertNoSubscribers();

140

141

/** Assert has cancelled subscribers */

142

TestPublisher<T> assertCancelled();

143

144

/** Assert specific number of cancelled subscribers */

145

TestPublisher<T> assertCancelled(int n);

146

147

/** Assert no cancelled subscribers */

148

TestPublisher<T> assertNotCancelled();

149

150

/** Assert request overflow occurred (requested more than available) */

151

TestPublisher<T> assertRequestOverflow();

152

153

/** Assert no request overflow */

154

TestPublisher<T> assertNoRequestOverflow();

155

}

156

```

157

158

**Usage Examples:**

159

160

```java

161

TestPublisher<Integer> publisher = TestPublisher.create();

162

163

// Subscribe and make requests

164

Disposable subscription = publisher.flux()

165

.subscribe(System.out::println);

166

167

// Test subscription behavior

168

publisher.assertSubscribers(1) // Has 1 subscriber

169

.assertMinRequested(1) // Requested at least 1 element

170

.next(42) // Send value

171

.assertNotCancelled() // Still subscribed

172

.complete();

173

174

// Test cancellation

175

subscription.dispose();

176

publisher.assertCancelled(1); // 1 cancelled subscriber

177

178

// Test backpressure

179

publisher.flux()

180

.take(5) // Subscriber will cancel after 5

181

.subscribe();

182

183

publisher.next(1, 2, 3, 4, 5, 6) // Send 6 values

184

.assertCancelled(); // Subscriber cancelled after 5

185

```

186

187

### Publisher Probe Integration

188

189

TestPublisher implements PublisherProbe for subscription event tracking.

190

191

```java { .api }

192

// TestPublisher inherits from PublisherProbe

193

interface PublisherProbe<T> {

194

boolean wasSubscribed();

195

long subscribeCount();

196

boolean wasCancelled();

197

boolean wasRequested();

198

199

void assertWasSubscribed();

200

void assertWasNotSubscribed();

201

void assertWasCancelled();

202

void assertWasNotCancelled();

203

void assertWasRequested();

204

void assertWasNotRequested();

205

}

206

```

207

208

## Types

209

210

```java { .api }

211

// Reactive Streams specification violations for testing error handling

212

enum TestPublisher.Violation {

213

/** Allow next calls despite insufficient request (violates backpressure) */

214

REQUEST_OVERFLOW,

215

216

/** Allow null values in next calls (violates non-null requirement) */

217

ALLOW_NULL,

218

219

/** Allow multiple termination signals (violates single terminal) */

220

CLEANUP_ON_TERMINATE,

221

222

/** Ignore cancellation signals (violates cancellation semantics) */

223

DEFER_CANCELLATION

224

}

225

```

226

227

**Violation Usage Examples:**

228

229

```java

230

// Test handling of backpressure violations

231

TestPublisher<String> overflowPublisher = TestPublisher.createNoncompliant(

232

TestPublisher.Violation.REQUEST_OVERFLOW

233

);

234

235

StepVerifier.create(overflowPublisher.flux(), 0) // Request 0 initially

236

.then(() -> overflowPublisher.next("overflow")) // Send despite no request

237

.thenRequest(1)

238

.expectNext("overflow")

239

.expectComplete()

240

.verify();

241

242

// Test null value handling

243

TestPublisher<String> nullPublisher = TestPublisher.createNoncompliant(

244

TestPublisher.Violation.ALLOW_NULL

245

);

246

247

StepVerifier.create(nullPublisher.flux())

248

.then(() -> nullPublisher.next(null).complete())

249

.expectNext((String) null)

250

.expectComplete()

251

.verify();

252

253

// Test multiple termination signals

254

TestPublisher<String> multiTermPublisher = TestPublisher.createNoncompliant(

255

TestPublisher.Violation.CLEANUP_ON_TERMINATE

256

);

257

258

StepVerifier.create(multiTermPublisher.flux())

259

.then(() -> {

260

multiTermPublisher.complete(); // First termination

261

multiTermPublisher.error(new RuntimeException()); // Second termination

262

})

263

.expectComplete() // Only first termination takes effect

264

.verify();

265

266

// Test cancellation deferral

267

TestPublisher<Integer> deferCancelPublisher = TestPublisher.createNoncompliant(

268

TestPublisher.Violation.DEFER_CANCELLATION

269

);

270

271

// Publisher ignores cancellation and continues emitting

272

Disposable subscription = deferCancelPublisher.flux()

273

.subscribe(System.out::println);

274

275

subscription.dispose(); // Cancel subscription

276

deferCancelPublisher.next(1, 2, 3); // Still emits despite cancellation

277

```