0
# Akka Stream TestKit
1
2
Akka Stream TestKit provides comprehensive testing utilities for Akka Streams applications. It offers controllable test sources and sinks, assertion capabilities for stream behavior verification, and utilities for testing custom graph stages and stream lifecycle management.
3
4
## Package Information
5
6
- **Package Name**: akka-stream-testkit_3
7
- **Package Type**: maven (SBT)
8
- **Language**: Scala (with Java API)
9
- **Group ID**: com.typesafe.akka
10
- **Artifact ID**: akka-stream-testkit_3
11
- **Version**: 2.8.8
12
- **Installation**: `"com.typesafe.akka" %% "akka-stream-testkit" % "2.8.8" % Test`
13
14
## Core Imports
15
16
### Scala DSL
17
18
```scala
19
import akka.stream.testkit.scaladsl.{TestSource, TestSink, StreamTestKit}
20
import akka.stream.testkit.{TestPublisher, TestSubscriber, TestSinkStage, TestSourceStage}
21
import akka.stream.testkit.GraphStageMessages
22
```
23
24
### Java API
25
26
```java
27
import akka.stream.testkit.javadsl.TestSource;
28
import akka.stream.testkit.javadsl.TestSink;
29
import akka.stream.testkit.javadsl.StreamTestKit;
30
import akka.stream.testkit.TestPublisher;
31
import akka.stream.testkit.TestSubscriber;
32
import akka.stream.testkit.TestSinkStage;
33
import akka.stream.testkit.TestSourceStage;
34
import akka.stream.testkit.GraphStageMessages;
35
```
36
37
## Basic Usage
38
39
```scala
40
import akka.actor.ActorSystem
41
import akka.stream.scaladsl.{Source, Sink, Keep}
42
import akka.stream.testkit.scaladsl.{TestSource, TestSink}
43
44
implicit val system = ActorSystem()
45
46
// Create a test source
47
val (pub, source) = TestSource[Int]().preMaterialize()
48
49
// Create a test sink
50
val sink = TestSink[Int]()
51
52
// Test a simple flow
53
val (probe, sinkProbe) = source
54
.map(_ * 2)
55
.toMat(sink)(Keep.both)
56
.run()
57
58
// Send elements and verify
59
pub.sendNext(1)
60
pub.sendNext(2)
61
pub.sendComplete()
62
63
sinkProbe.request(2)
64
sinkProbe.expectNext(2, 4)
65
sinkProbe.expectComplete()
66
```
67
68
## Architecture
69
70
Akka Stream TestKit is built around several key components:
71
72
- **TestPublisher/TestSubscriber**: Core testing utilities implementing Reactive Streams Publisher/Subscriber interfaces
73
- **Test Sources/Sinks**: Factory methods for creating testable stream endpoints
74
- **Probe System**: Manual and automatic probes for controlling and observing stream behavior
75
- **Assertion Framework**: Rich set of expectation methods for verifying stream behavior
76
- **Graph Stage Testing**: Utilities for testing custom stream processing stages
77
- **Lifecycle Management**: Tools for asserting proper cleanup of stream resources
78
79
## Capabilities
80
81
### Test Publishers
82
83
Publisher utilities for creating controllable upstream sources in tests, with demand tracking and element injection capabilities.
84
85
```scala { .api }
86
object TestPublisher {
87
def empty[T](): Publisher[T]
88
def lazyEmpty[T]: Publisher[T]
89
def error[T](cause: Throwable): Publisher[T]
90
def lazyError[T](cause: Throwable): Publisher[T]
91
def manualProbe[T](autoOnSubscribe: Boolean = true)(implicit system: ActorSystem): ManualProbe[T]
92
def probe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): Probe[T]
93
94
// Factory methods with ClassicActorSystemProvider
95
object ManualProbe {
96
def apply[T](autoOnSubscribe: Boolean = true)(implicit system: ClassicActorSystemProvider): ManualProbe[T]
97
}
98
}
99
```
100
101
[Test Publishers](./test-publishers.md)
102
103
### Test Subscribers
104
105
Subscriber utilities for creating controllable downstream sinks in tests, with element expectation and timing assertion capabilities.
106
107
```scala { .api }
108
object TestSubscriber {
109
def manualProbe[T]()(implicit system: ActorSystem): ManualProbe[T]
110
def probe[T]()(implicit system: ActorSystem): Probe[T]
111
112
// Factory methods with ClassicActorSystemProvider
113
object ManualProbe {
114
def apply[T]()(implicit system: ClassicActorSystemProvider): ManualProbe[T]
115
}
116
}
117
```
118
119
[Test Subscribers](./test-subscribers.md)
120
121
### Test Sources and Sinks
122
123
Factory methods for creating Source and Sink instances that materialize to test probes, providing the primary interface for stream testing.
124
125
```scala { .api }
126
// Scala DSL
127
object TestSource {
128
def apply[T]()(implicit system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]
129
@deprecated("Use `TestSource()` with implicit ClassicActorSystemProvider instead.", "2.7.0")
130
def probe[T](implicit system: ActorSystem): Source[T, TestPublisher.Probe[T]]
131
}
132
133
object TestSink {
134
def apply[T]()(implicit system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]
135
@deprecated("Use `TestSink()` with implicit ClassicActorSystemProvider instead.", "2.7.0")
136
def probe[T](implicit system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]
137
}
138
139
// Java API
140
object TestSource {
141
def create[T](system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]
142
@deprecated("Use `TestSource.create` with ClassicActorSystemProvider instead.", "2.7.0")
143
def probe[T](system: ActorSystem): Source[T, TestPublisher.Probe[T]]
144
}
145
146
object TestSink {
147
def create[T](system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]
148
@deprecated("Use `TestSink.create` with ClassicActorSystemProvider instead.", "2.7.0")
149
def probe[T](system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]
150
}
151
```
152
153
[Test Sources and Sinks](./test-sources-sinks.md)
154
155
### Graph Stage Testing
156
157
Utilities for testing custom GraphStage implementations by wrapping them with monitoring capabilities that emit events to test probes.
158
159
```scala { .api }
160
object TestSinkStage {
161
def apply[T, M](stageUnderTest: GraphStageWithMaterializedValue[SinkShape[T], M], probe: TestProbe): Sink[T, M]
162
}
163
164
object TestSourceStage {
165
def apply[T, M](stageUnderTest: GraphStageWithMaterializedValue[SourceShape[T], M], probe: TestProbe): Source[T, M]
166
}
167
168
// Graph Stage Messages
169
object GraphStageMessages {
170
sealed trait StageMessage
171
case object Push extends StageMessage
172
case object Pull extends StageMessage
173
case object UpstreamFinish extends StageMessage
174
case object DownstreamFinish extends StageMessage
175
case class Failure(ex: Throwable) extends StageMessage
176
case class StageFailure(operation: StageMessage, exception: Throwable)
177
}
178
```
179
180
[Graph Stage Testing](./graph-stage-testing.md)
181
182
### Stream Lifecycle Management
183
184
Utilities for asserting that stream processing stages are properly cleaned up after test execution, helping detect resource leaks.
185
186
```scala { .api }
187
// Scala DSL
188
object StreamTestKit {
189
def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T
190
}
191
192
// Java API
193
object StreamTestKit {
194
def assertAllStagesStopped(mat: Materializer): Unit
195
def assertAllStagesStopped(system: ClassicActorSystemProvider): Unit
196
}
197
```
198
199
[Stream Lifecycle Management](./stream-lifecycle.md)
200
201
## Types
202
203
```scala { .api }
204
// Publisher Events
205
sealed trait PublisherEvent
206
case class Subscribe(subscription: Subscription) extends PublisherEvent
207
case class CancelSubscription(subscription: Subscription, cause: Throwable) extends PublisherEvent
208
case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent
209
210
// Subscriber Events
211
sealed trait SubscriberEvent
212
case class OnSubscribe(subscription: Subscription) extends SubscriberEvent
213
case class OnNext[I](element: I) extends SubscriberEvent
214
case object OnComplete extends SubscriberEvent
215
case class OnError(cause: Throwable) extends SubscriberEvent
216
217
// Graph Stage Messages
218
sealed trait StageMessage
219
case object Push extends StageMessage
220
case object Pull extends StageMessage
221
case object UpstreamFinish extends StageMessage
222
case object DownstreamFinish extends StageMessage
223
case class Failure(ex: Throwable) extends StageMessage
224
case class StageFailure(operation: StageMessage, exception: Throwable)
225
226
// Probe Classes
227
abstract class TestPublisher {
228
abstract class ManualProbe[I] extends Publisher[I] {
229
def expectSubscription(): Subscription
230
def expectRequest(): Long
231
def expectRequest(n: Long): Unit
232
def expectCancellation(): Unit
233
def expectCancellationWithCause(): Throwable
234
def expectNoMessage(): Unit
235
def expectNoMessage(remaining: FiniteDuration): Unit
236
def sendNext(element: I): Unit
237
def sendComplete(): Unit
238
def sendError(cause: Throwable): Unit
239
def getSubscriber: Subscriber[_ >: I]
240
}
241
242
abstract class Probe[I] extends ManualProbe[I] {
243
def sendNext(element: I): Unit
244
def sendComplete(): Unit
245
def sendError(cause: Throwable): Unit
246
def expectRequest(): Long
247
def expectRequest(n: Long): Unit
248
def pending: Long
249
}
250
}
251
252
abstract class TestSubscriber {
253
abstract class ManualProbe[I] extends Subscriber[I] {
254
def expectSubscription(): Subscription
255
def expectNext(): I
256
def expectNext(element: I): Unit
257
def expectNext(d: FiniteDuration, element: I): Unit
258
def expectNext(e1: I, e2: I, es: I*): Unit
259
def expectNextN(n: Long): immutable.Seq[I]
260
def expectNextN(all: immutable.Seq[I]): Unit
261
def expectNextUnordered(e1: I, e2: I, es: I*): Unit
262
def expectNextUnorderedN(all: immutable.Seq[I]): Unit
263
def expectNextOrError(): Either[Throwable, I]
264
def expectNextOrComplete(): Either[OnComplete.type, I]
265
def expectComplete(): Unit
266
def expectError(): Throwable
267
def expectError(cause: Throwable): Unit
268
def expectNoMessage(): Unit
269
def expectNoMessage(remaining: FiniteDuration): Unit
270
def request(n: Long): Unit
271
def cancel(): Unit
272
}
273
274
abstract class Probe[I] extends ManualProbe[I] {
275
def request(n: Long): Unit
276
def requestNext(): I
277
def requestNext(element: I): Unit
278
}
279
}
280
```