or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

graph-stage-testing.mdindex.mdstream-lifecycle.mdtest-publishers.mdtest-sources-sinks.mdtest-subscribers.md

test-subscribers.mddocs/

0

# Test Subscribers

1

2

Subscriber utilities for creating controllable downstream sinks in tests. Test subscribers implement the Reactive Streams Subscriber interface while providing comprehensive assertion capabilities for verifying stream behavior, element expectations, and timing requirements.

3

4

## Capabilities

5

6

### Factory Methods

7

8

Static factory methods for creating test subscribers with different control mechanisms.

9

10

```scala { .api }

11

/**

12

* Creates a manual probe that implements Subscriber interface

13

*/

14

def manualProbe[T]()(implicit system: ActorSystem): ManualProbe[T]

15

16

/**

17

* Creates a subscriber probe with automatic subscription management

18

*/

19

def probe[T]()(implicit system: ActorSystem): Probe[T]

20

```

21

22

**Usage Examples:**

23

24

```scala

25

import akka.stream.testkit.TestSubscriber

26

27

// Manual control subscriber

28

val manualProbe = TestSubscriber.manualProbe[String]()

29

30

// Automatic subscription management

31

val autoProbe = TestSubscriber.probe[String]()

32

```

33

34

### ManualProbe Class

35

36

Manual test subscriber that provides complete control over subscription management and element expectations with comprehensive assertion methods.

37

38

```scala { .api }

39

class ManualProbe[I] extends Subscriber[I] {

40

type Self <: ManualProbe[I]

41

42

/**

43

* Expect and return a Subscription

44

*/

45

def expectSubscription(): Subscription

46

47

/**

48

* Expect and return any SubscriberEvent (OnSubscribe, OnNext, OnError, or OnComplete)

49

*/

50

def expectEvent(): SubscriberEvent

51

def expectEvent(max: FiniteDuration): SubscriberEvent

52

def expectEvent(event: SubscriberEvent): Self

53

54

/**

55

* Expect and return a stream element

56

*/

57

def expectNext(): I

58

def expectNext(d: FiniteDuration): I

59

def expectNext(element: I): Self

60

def expectNext(d: FiniteDuration, element: I): Self

61

62

/**

63

* Expect multiple stream elements in order

64

*/

65

def expectNext(e1: I, e2: I, es: I*): Self

66

67

/**

68

* Expect multiple stream elements in arbitrary order

69

*/

70

def expectNextUnordered(e1: I, e2: I, es: I*): Self

71

72

/**

73

* Expect and return the next N stream elements

74

*/

75

def expectNextN(n: Long): immutable.Seq[I]

76

def expectNextN(all: immutable.Seq[I]): Self

77

78

/**

79

* Expect elements in any order

80

*/

81

def expectNextUnorderedN(all: immutable.Seq[I]): Self

82

83

/**

84

* Expect stream completion

85

*/

86

def expectComplete(): Self

87

88

/**

89

* Expect and return the signalled Throwable

90

*/

91

def expectError(): Throwable

92

def expectError(cause: Throwable): Self

93

94

/**

95

* Expect subscription followed by error (with optional demand signaling)

96

*/

97

def expectSubscriptionAndError(): Throwable

98

def expectSubscriptionAndError(signalDemand: Boolean): Throwable

99

def expectSubscriptionAndError(cause: Throwable): Self

100

def expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean): Self

101

102

/**

103

* Expect subscription followed by completion (with optional demand signaling)

104

*/

105

def expectSubscriptionAndComplete(): Self

106

def expectSubscriptionAndComplete(signalDemand: Boolean): Self

107

108

/**

109

* Expect next element or error signal, returning whichever was signalled

110

*/

111

def expectNextOrError(): Either[Throwable, I]

112

def expectNextOrError(element: I, cause: Throwable): Either[Throwable, I]

113

114

/**

115

* Expect next element or stream completion

116

*/

117

def expectNextOrComplete(): Either[OnComplete.type, I]

118

def expectNextOrComplete(element: I): Self

119

120

/**

121

* Assert that no message is received for the specified time

122

*/

123

def expectNoMessage(): Self

124

def expectNoMessage(remaining: FiniteDuration): Self

125

def expectNoMessage(remaining: java.time.Duration): Self

126

127

/**

128

* Expect a stream element and test it with partial function

129

*/

130

def expectNextPF[T](f: PartialFunction[Any, T]): T

131

def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T

132

def expectNextChainingPF(f: PartialFunction[Any, Any]): Self

133

def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]): Self

134

135

/**

136

* Expect event matching partial function

137

*/

138

def expectEventPF[T](f: PartialFunction[SubscriberEvent, T]): T

139

def expectEventWithTimeoutPF[T](max: Duration, f: PartialFunction[SubscriberEvent, T]): T

140

141

/**

142

* Receive messages for a given duration or until one does not match a given partial function

143

*/

144

def receiveWhile[T](

145

max: Duration = Duration.Undefined,

146

idle: Duration = Duration.Inf,

147

messages: Int = Int.MaxValue

148

)(f: PartialFunction[SubscriberEvent, T]): immutable.Seq[T]

149

150

/**

151

* Receive messages within time limit

152

*/

153

def receiveWithin(max: FiniteDuration, messages: Int = Int.MaxValue): immutable.Seq[I]

154

155

/**

156

* Attempt to drain the stream into a strict collection (requests Long.MaxValue elements)

157

* WARNING: Use with caution for infinite streams or large elements

158

*/

159

def toStrict(atMost: FiniteDuration): immutable.Seq[I]

160

161

/**

162

* Execute code block while bounding its execution time

163

*/

164

def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T

165

def within[T](max: FiniteDuration)(f: => T): T

166

167

// Subscriber interface methods

168

def onSubscribe(subscription: Subscription): Unit

169

def onNext(element: I): Unit

170

def onComplete(): Unit

171

def onError(cause: Throwable): Unit

172

}

173

```

174

175

**Usage Examples:**

176

177

```scala

178

import akka.stream.testkit.TestSubscriber

179

import scala.concurrent.duration._

180

181

val probe = TestSubscriber.manualProbe[String]()

182

183

// Subscribe to a source

184

source.subscribe(probe)

185

186

// Expect subscription and request elements

187

val subscription = probe.expectSubscription()

188

subscription.request(3)

189

190

// Expect specific elements

191

probe.expectNext("hello")

192

probe.expectNext("world")

193

probe.expectComplete()

194

195

// Expect elements with timeout

196

probe.expectNext(1.second, "delayed")

197

198

// Expect multiple elements

199

probe.expectNext("a", "b", "c")

200

201

// Expect elements in any order

202

probe.expectNextUnordered("x", "y", "z")

203

204

// Pattern matching on elements

205

probe.expectNextPF {

206

case s: String if s.startsWith("test") => s.length

207

}

208

```

209

210

### Probe Class

211

212

Test subscriber with automatic subscription management that extends ManualProbe with simplified request handling and additional convenience methods.

213

214

```scala { .api }

215

class Probe[T] extends ManualProbe[T] {

216

override type Self = Probe[T]

217

218

/**

219

* Asserts that a subscription has been received or will be received

220

*/

221

def ensureSubscription(): Self

222

223

/**

224

* Request N elements from the subscription

225

*/

226

def request(n: Long): Self

227

228

/**

229

* Request and expect a specific stream element

230

*/

231

def requestNext(element: T): Self

232

233

/**

234

* Request and expect the next stream element, returning it

235

*/

236

def requestNext(): T

237

def requestNext(d: FiniteDuration): T

238

239

/**

240

* Cancel the subscription

241

*/

242

def cancel(): Self

243

244

/**

245

* Cancel the subscription with a specific cause

246

*/

247

def cancel(cause: Throwable): Self

248

}

249

```

250

251

**Usage Examples:**

252

253

```scala

254

import akka.stream.testkit.TestSubscriber

255

256

val probe = TestSubscriber.probe[Int]()

257

258

// Automatic subscription management

259

source.runWith(Sink.fromSubscriber(probe))

260

261

// Request and expect elements

262

probe.request(1)

263

val element = probe.expectNext()

264

265

// Request and expect specific element

266

probe.requestNext(42)

267

268

// Request and return element

269

val next = probe.requestNext()

270

271

// Cancel subscription

272

probe.cancel()

273

274

// Cancel with cause

275

probe.cancel(new RuntimeException("test cancellation"))

276

```

277

278

## Subscriber Events

279

280

```scala { .api }

281

sealed trait SubscriberEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded

282

283

/**

284

* Received when the subscriber is subscribed to a publisher

285

*/

286

final case class OnSubscribe(subscription: Subscription) extends SubscriberEvent

287

288

/**

289

* Received when a stream element arrives

290

*/

291

final case class OnNext[I](element: I) extends SubscriberEvent

292

293

/**

294

* Received when the stream completes successfully

295

*/

296

case object OnComplete extends SubscriberEvent

297

298

/**

299

* Received when the stream terminates with an error

300

*/

301

final case class OnError(cause: Throwable) extends SubscriberEvent

302

```

303

304

## Advanced Usage Patterns

305

306

### Testing Stream Completion

307

308

```scala

309

val probe = TestSubscriber.probe[String]()

310

311

// Test immediate completion

312

probe.expectSubscriptionAndComplete()

313

314

// Test completion after elements

315

val subscription = probe.expectSubscription()

316

subscription.request(2)

317

probe.expectNext("a", "b")

318

probe.expectComplete()

319

```

320

321

### Testing Error Scenarios

322

323

```scala

324

val probe = TestSubscriber.probe[String]()

325

326

// Test immediate error

327

val error = probe.expectSubscriptionAndError()

328

error shouldBe a[RuntimeException]

329

330

// Test error after elements

331

val subscription = probe.expectSubscription()

332

subscription.request(1)

333

probe.expectNext("element")

334

val thrownError = probe.expectError()

335

```

336

337

### Testing Backpressure

338

339

```scala

340

val probe = TestSubscriber.probe[Int]()

341

342

// Don't request initially, verify no elements arrive

343

probe.expectSubscription()

344

probe.expectNoMessage(100.millis)

345

346

// Request and verify elements arrive

347

probe.request(2)

348

probe.expectNext(1, 2)

349

```

350

351

### Testing Large Streams

352

353

```scala

354

val probe = TestSubscriber.probe[Int]()

355

356

// Request many elements

357

probe.request(1000)

358

359

// Verify specific elements

360

val elements = probe.expectNextN(1000)

361

elements should have size 1000

362

363

// Or drain to strict collection (use carefully)

364

val allElements = probe.toStrict(5.seconds)

365

```

366

367

### Conditional Element Testing

368

369

```scala

370

val probe = TestSubscriber.probe[String]()

371

372

// Test elements with partial function

373

probe.expectNextPF {

374

case s if s.length > 5 => s.toUpperCase

375

}

376

377

// Chain multiple expectations

378

probe

379

.expectNextChainingPF {

380

case s: String => s.trim

381

}

382

.expectNextChainingPF {

383

case s if s.nonEmpty => s

384

}

385

```

386

387

### Testing Alternative Outcomes

388

389

```scala

390

val probe = TestSubscriber.probe[String]()

391

392

// Expect either next element or error

393

probe.expectNextOrError() match {

394

case Right(element) => println(s"Got element: $element")

395

case Left(error) => println(s"Got error: $error")

396

}

397

398

// Expect either next element or completion

399

probe.expectNextOrComplete() match {

400

case Right(element) => println(s"Got element: $element")

401

case Left(OnComplete) => println("Stream completed")

402

}

403

```