or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

graph-stage-testing.mdindex.mdstream-lifecycle.mdtest-publishers.mdtest-sources-sinks.mdtest-subscribers.md

test-publishers.mddocs/

0

# Test Publishers

1

2

Publisher utilities for creating controllable upstream sources in tests. Test publishers implement the Reactive Streams Publisher interface while providing fine-grained control over element emission, demand tracking, and subscription lifecycle management.

3

4

## Capabilities

5

6

### Factory Methods

7

8

Static factory methods for creating various types of test publishers with different behavior patterns.

9

10

```scala { .api }

11

/**

12

* Publisher that signals complete to subscribers immediately after handing a void subscription

13

*/

14

def empty[T](): Publisher[T]

15

16

/**

17

* Publisher that subscribes the subscriber and completes after the first request

18

*/

19

def lazyEmpty[T]: Publisher[T]

20

21

/**

22

* Publisher that signals error to subscribers immediately after handing out subscription

23

*/

24

def error[T](cause: Throwable): Publisher[T]

25

26

/**

27

* Publisher that subscribes the subscriber and signals error after the first request

28

*/

29

def lazyError[T](cause: Throwable): Publisher[T]

30

31

/**

32

* Creates a manual probe that implements Publisher interface

33

* @param autoOnSubscribe Whether to automatically call onSubscribe (default: true)

34

*/

35

def manualProbe[T](autoOnSubscribe: Boolean = true)(implicit system: ActorSystem): ManualProbe[T]

36

37

/**

38

* Creates a probe that implements Publisher interface and tracks demand

39

* @param initialPendingRequests Initial number of pending requests (default: 0)

40

*/

41

def probe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): Probe[T]

42

```

43

44

**Usage Examples:**

45

46

```scala

47

import akka.stream.testkit.TestPublisher

48

49

// Empty publishers for testing completion scenarios

50

val emptyPub = TestPublisher.empty[String]()

51

val lazyEmptyPub = TestPublisher.lazyEmpty[String]

52

53

// Error publishers for testing error scenarios

54

val errorPub = TestPublisher.error[String](new RuntimeException("test error"))

55

val lazyErrorPub = TestPublisher.lazyError[String](new RuntimeException("test error"))

56

57

// Manual control publishers

58

val manualPub = TestPublisher.manualProbe[String]()

59

val autoPub = TestPublisher.probe[String]()

60

```

61

62

### ManualProbe Class

63

64

Manual test publisher that provides complete control over the publishing lifecycle and requires explicit management of subscription and demand.

65

66

```scala { .api }

67

class ManualProbe[I] extends Publisher[I] {

68

type Self <: ManualProbe[I]

69

70

/**

71

* Subscribes a given Subscriber to this probe publisher

72

*/

73

def subscribe(subscriber: Subscriber[_ >: I]): Unit

74

75

/**

76

* Execute code block after subscription is established

77

*/

78

def executeAfterSubscription[T](f: => T): T

79

80

/**

81

* Expect a subscription and return the subscription object

82

*/

83

def expectSubscription(): PublisherProbeSubscription[I]

84

85

/**

86

* Expect demand from a given subscription

87

*/

88

def expectRequest(subscription: Subscription, n: Int): Self

89

90

/**

91

* Expect no messages for the default period

92

*/

93

def expectNoMessage(): Self

94

95

/**

96

* Expect no messages for a given duration

97

*/

98

def expectNoMessage(max: FiniteDuration): Self

99

100

/**

101

* Receive messages for a given duration or until one does not match a given partial function

102

*/

103

def receiveWhile[T](

104

max: Duration = Duration.Undefined,

105

idle: Duration = Duration.Inf,

106

messages: Int = Int.MaxValue

107

)(f: PartialFunction[PublisherEvent, T]): immutable.Seq[T]

108

109

/**

110

* Expect an event matching the partial function

111

*/

112

def expectEventPF[T](f: PartialFunction[PublisherEvent, T]): T

113

114

/**

115

* Get the Publisher interface

116

*/

117

def getPublisher: Publisher[I]

118

119

/**

120

* Execute code block while bounding its execution time between min and max

121

*/

122

def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T

123

124

/**

125

* Execute code block within the specified maximum time

126

*/

127

def within[T](max: FiniteDuration)(f: => T): T

128

}

129

```

130

131

**Usage Examples:**

132

133

```scala

134

import akka.stream.testkit.TestPublisher

135

136

val probe = TestPublisher.manualProbe[String]()

137

val subscription = probe.expectSubscription()

138

139

// Expect demand and send elements

140

probe.expectRequest(subscription, 1)

141

subscription.sendNext("hello")

142

143

probe.expectRequest(subscription, 2)

144

subscription.sendNext("world")

145

subscription.sendComplete()

146

```

147

148

### Probe Class

149

150

Test publisher with automatic demand tracking that extends ManualProbe with simplified element sending and demand management.

151

152

```scala { .api }

153

class Probe[T] extends ManualProbe[T] {

154

type Self = Probe[T]

155

156

/**

157

* Asserts that a subscription has been received or will be received

158

*/

159

def ensureSubscription(): Unit

160

161

/**

162

* Current pending requests

163

*/

164

def pending: Long

165

166

/**

167

* Send next element, automatically checking and decrementing demand

168

*/

169

def sendNext(elem: T): Self

170

171

/**

172

* Send next element without checking demand (unsafe)

173

*/

174

def unsafeSendNext(elem: T): Self

175

176

/**

177

* Send completion signal

178

*/

179

def sendComplete(): Self

180

181

/**

182

* Send error signal

183

*/

184

def sendError(cause: Throwable): Self

185

186

/**

187

* Expect request and add to pending requests, returning the request amount

188

*/

189

def expectRequest(): Long

190

191

/**

192

* Expect cancellation

193

*/

194

def expectCancellation(): Self

195

196

/**

197

* Expect cancellation with specific cause

198

*/

199

def expectCancellationWithCause(expectedCause: Throwable): Self

200

201

/**

202

* Expect cancellation with typed cause, returning the cause

203

*/

204

def expectCancellationWithCause[E <: Throwable: ClassTag](): E

205

206

/**

207

* Java API: Expect cancellation with specific cause class

208

*/

209

def expectCancellationWithCause[E <: Throwable](causeClass: Class[E]): E

210

}

211

```

212

213

**Usage Examples:**

214

215

```scala

216

import akka.stream.testkit.TestPublisher

217

218

val probe = TestPublisher.probe[Int]()

219

220

// Send elements with automatic demand management

221

probe.sendNext(1)

222

probe.sendNext(2)

223

probe.sendNext(3)

224

probe.sendComplete()

225

226

// Check pending demand

227

println(s"Pending requests: ${probe.pending}")

228

229

// Handle cancellation scenarios

230

probe.expectCancellation()

231

232

// Handle typed cancellation causes

233

val cause = probe.expectCancellationWithCause[IllegalArgumentException]()

234

```

235

236

### PublisherProbeSubscription

237

238

Internal subscription implementation that bridges the test probe with subscribers, providing methods for sending elements and expecting subscription events.

239

240

```scala { .api }

241

case class PublisherProbeSubscription[I](

242

subscriber: Subscriber[_ >: I],

243

publisherProbe: TestProbe

244

) extends Subscription with SubscriptionWithCancelException {

245

246

/**

247

* Request elements from the subscription

248

*/

249

def request(elements: Long): Unit

250

251

/**

252

* Cancel subscription with cause

253

*/

254

def cancel(cause: Throwable): Unit

255

256

/**

257

* Expect a specific request amount

258

*/

259

def expectRequest(n: Long): Unit

260

261

/**

262

* Expect any request and return the amount

263

*/

264

def expectRequest(): Long

265

266

/**

267

* Expect cancellation and return the cause

268

*/

269

def expectCancellation(): Throwable

270

271

/**

272

* Send next element to subscriber

273

*/

274

def sendNext(element: I): Unit

275

276

/**

277

* Send completion signal to subscriber

278

*/

279

def sendComplete(): Unit

280

281

/**

282

* Send error signal to subscriber

283

*/

284

def sendError(cause: Throwable): Unit

285

286

/**

287

* Send onSubscribe signal to subscriber

288

*/

289

def sendOnSubscribe(): Unit

290

}

291

```

292

293

## Publisher Events

294

295

```scala { .api }

296

sealed trait PublisherEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded

297

298

/**

299

* Emitted when a subscriber subscribes to the publisher

300

*/

301

final case class Subscribe(subscription: Subscription) extends PublisherEvent

302

303

/**

304

* Emitted when a subscription is cancelled

305

*/

306

final case class CancelSubscription(subscription: Subscription, cause: Throwable) extends PublisherEvent

307

308

/**

309

* Emitted when a subscriber requests more elements

310

*/

311

final case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent

312

```

313

314

## Common Patterns

315

316

### Testing Backpressure

317

318

```scala

319

val probe = TestPublisher.probe[Int]()

320

321

// Send multiple elements

322

probe.sendNext(1)

323

probe.sendNext(2)

324

probe.sendNext(3)

325

326

// Verify demand management

327

probe.expectRequest() should be > 0L

328

```

329

330

### Testing Error Scenarios

331

332

```scala

333

val probe = TestPublisher.probe[String]()

334

val error = new RuntimeException("test failure")

335

336

probe.sendError(error)

337

// Verify error handling in downstream

338

```

339

340

### Testing Stream Completion

341

342

```scala

343

val probe = TestPublisher.probe[Int]()

344

345

probe.sendNext(1)

346

probe.sendNext(2)

347

probe.sendComplete()

348

349

// Verify completion behavior

350

```