or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

graph-stage-testing.mdindex.mdstream-lifecycle.mdtest-publishers.mdtest-sources-sinks.mdtest-subscribers.md

index.mddocs/

0

# Akka Stream TestKit

1

2

Akka Stream TestKit provides comprehensive testing utilities for Akka Streams applications. It offers controllable test sources and sinks, assertion capabilities for stream behavior verification, and utilities for testing custom graph stages and stream lifecycle management.

3

4

## Package Information

5

6

- **Package Name**: akka-stream-testkit_3

7

- **Package Type**: maven (SBT)

8

- **Language**: Scala (with Java API)

9

- **Group ID**: com.typesafe.akka

10

- **Artifact ID**: akka-stream-testkit_3

11

- **Version**: 2.8.8

12

- **Installation**: `"com.typesafe.akka" %% "akka-stream-testkit" % "2.8.8" % Test`

13

14

## Core Imports

15

16

### Scala DSL

17

18

```scala

19

import akka.stream.testkit.scaladsl.{TestSource, TestSink, StreamTestKit}

20

import akka.stream.testkit.{TestPublisher, TestSubscriber, TestSinkStage, TestSourceStage}

21

import akka.stream.testkit.GraphStageMessages

22

```

23

24

### Java API

25

26

```java

27

import akka.stream.testkit.javadsl.TestSource;

28

import akka.stream.testkit.javadsl.TestSink;

29

import akka.stream.testkit.javadsl.StreamTestKit;

30

import akka.stream.testkit.TestPublisher;

31

import akka.stream.testkit.TestSubscriber;

32

import akka.stream.testkit.TestSinkStage;

33

import akka.stream.testkit.TestSourceStage;

34

import akka.stream.testkit.GraphStageMessages;

35

```

36

37

## Basic Usage

38

39

```scala

40

import akka.actor.ActorSystem

41

import akka.stream.scaladsl.{Source, Sink, Keep}

42

import akka.stream.testkit.scaladsl.{TestSource, TestSink}

43

44

implicit val system = ActorSystem()

45

46

// Create a test source

47

val (pub, source) = TestSource[Int]().preMaterialize()

48

49

// Create a test sink

50

val sink = TestSink[Int]()

51

52

// Test a simple flow

53

val (probe, sinkProbe) = source

54

.map(_ * 2)

55

.toMat(sink)(Keep.both)

56

.run()

57

58

// Send elements and verify

59

pub.sendNext(1)

60

pub.sendNext(2)

61

pub.sendComplete()

62

63

sinkProbe.request(2)

64

sinkProbe.expectNext(2, 4)

65

sinkProbe.expectComplete()

66

```

67

68

## Architecture

69

70

Akka Stream TestKit is built around several key components:

71

72

- **TestPublisher/TestSubscriber**: Core testing utilities implementing Reactive Streams Publisher/Subscriber interfaces

73

- **Test Sources/Sinks**: Factory methods for creating testable stream endpoints

74

- **Probe System**: Manual and automatic probes for controlling and observing stream behavior

75

- **Assertion Framework**: Rich set of expectation methods for verifying stream behavior

76

- **Graph Stage Testing**: Utilities for testing custom stream processing stages

77

- **Lifecycle Management**: Tools for asserting proper cleanup of stream resources

78

79

## Capabilities

80

81

### Test Publishers

82

83

Publisher utilities for creating controllable upstream sources in tests, with demand tracking and element injection capabilities.

84

85

```scala { .api }

86

object TestPublisher {

87

def empty[T](): Publisher[T]

88

def lazyEmpty[T]: Publisher[T]

89

def error[T](cause: Throwable): Publisher[T]

90

def lazyError[T](cause: Throwable): Publisher[T]

91

def manualProbe[T](autoOnSubscribe: Boolean = true)(implicit system: ActorSystem): ManualProbe[T]

92

def probe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): Probe[T]

93

94

// Factory methods with ClassicActorSystemProvider

95

object ManualProbe {

96

def apply[T](autoOnSubscribe: Boolean = true)(implicit system: ClassicActorSystemProvider): ManualProbe[T]

97

}

98

}

99

```

100

101

[Test Publishers](./test-publishers.md)

102

103

### Test Subscribers

104

105

Subscriber utilities for creating controllable downstream sinks in tests, with element expectation and timing assertion capabilities.

106

107

```scala { .api }

108

object TestSubscriber {

109

def manualProbe[T]()(implicit system: ActorSystem): ManualProbe[T]

110

def probe[T]()(implicit system: ActorSystem): Probe[T]

111

112

// Factory methods with ClassicActorSystemProvider

113

object ManualProbe {

114

def apply[T]()(implicit system: ClassicActorSystemProvider): ManualProbe[T]

115

}

116

}

117

```

118

119

[Test Subscribers](./test-subscribers.md)

120

121

### Test Sources and Sinks

122

123

Factory methods for creating Source and Sink instances that materialize to test probes, providing the primary interface for stream testing.

124

125

```scala { .api }

126

// Scala DSL

127

object TestSource {

128

def apply[T]()(implicit system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]

129

@deprecated("Use `TestSource()` with implicit ClassicActorSystemProvider instead.", "2.7.0")

130

def probe[T](implicit system: ActorSystem): Source[T, TestPublisher.Probe[T]]

131

}

132

133

object TestSink {

134

def apply[T]()(implicit system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]

135

@deprecated("Use `TestSink()` with implicit ClassicActorSystemProvider instead.", "2.7.0")

136

def probe[T](implicit system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]

137

}

138

139

// Java API

140

object TestSource {

141

def create[T](system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]

142

@deprecated("Use `TestSource.create` with ClassicActorSystemProvider instead.", "2.7.0")

143

def probe[T](system: ActorSystem): Source[T, TestPublisher.Probe[T]]

144

}

145

146

object TestSink {

147

def create[T](system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]

148

@deprecated("Use `TestSink.create` with ClassicActorSystemProvider instead.", "2.7.0")

149

def probe[T](system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]

150

}

151

```

152

153

[Test Sources and Sinks](./test-sources-sinks.md)

154

155

### Graph Stage Testing

156

157

Utilities for testing custom GraphStage implementations by wrapping them with monitoring capabilities that emit events to test probes.

158

159

```scala { .api }

160

object TestSinkStage {

161

def apply[T, M](stageUnderTest: GraphStageWithMaterializedValue[SinkShape[T], M], probe: TestProbe): Sink[T, M]

162

}

163

164

object TestSourceStage {

165

def apply[T, M](stageUnderTest: GraphStageWithMaterializedValue[SourceShape[T], M], probe: TestProbe): Source[T, M]

166

}

167

168

// Graph Stage Messages

169

object GraphStageMessages {

170

sealed trait StageMessage

171

case object Push extends StageMessage

172

case object Pull extends StageMessage

173

case object UpstreamFinish extends StageMessage

174

case object DownstreamFinish extends StageMessage

175

case class Failure(ex: Throwable) extends StageMessage

176

case class StageFailure(operation: StageMessage, exception: Throwable)

177

}

178

```

179

180

[Graph Stage Testing](./graph-stage-testing.md)

181

182

### Stream Lifecycle Management

183

184

Utilities for asserting that stream processing stages are properly cleaned up after test execution, helping detect resource leaks.

185

186

```scala { .api }

187

// Scala DSL

188

object StreamTestKit {

189

def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T

190

}

191

192

// Java API

193

object StreamTestKit {

194

def assertAllStagesStopped(mat: Materializer): Unit

195

def assertAllStagesStopped(system: ClassicActorSystemProvider): Unit

196

}

197

```

198

199

[Stream Lifecycle Management](./stream-lifecycle.md)

200

201

## Types

202

203

```scala { .api }

204

// Publisher Events

205

sealed trait PublisherEvent

206

case class Subscribe(subscription: Subscription) extends PublisherEvent

207

case class CancelSubscription(subscription: Subscription, cause: Throwable) extends PublisherEvent

208

case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent

209

210

// Subscriber Events

211

sealed trait SubscriberEvent

212

case class OnSubscribe(subscription: Subscription) extends SubscriberEvent

213

case class OnNext[I](element: I) extends SubscriberEvent

214

case object OnComplete extends SubscriberEvent

215

case class OnError(cause: Throwable) extends SubscriberEvent

216

217

// Graph Stage Messages

218

sealed trait StageMessage

219

case object Push extends StageMessage

220

case object Pull extends StageMessage

221

case object UpstreamFinish extends StageMessage

222

case object DownstreamFinish extends StageMessage

223

case class Failure(ex: Throwable) extends StageMessage

224

case class StageFailure(operation: StageMessage, exception: Throwable)

225

226

// Probe Classes

227

abstract class TestPublisher {

228

abstract class ManualProbe[I] extends Publisher[I] {

229

def expectSubscription(): Subscription

230

def expectRequest(): Long

231

def expectRequest(n: Long): Unit

232

def expectCancellation(): Unit

233

def expectCancellationWithCause(): Throwable

234

def expectNoMessage(): Unit

235

def expectNoMessage(remaining: FiniteDuration): Unit

236

def sendNext(element: I): Unit

237

def sendComplete(): Unit

238

def sendError(cause: Throwable): Unit

239

def getSubscriber: Subscriber[_ >: I]

240

}

241

242

abstract class Probe[I] extends ManualProbe[I] {

243

def sendNext(element: I): Unit

244

def sendComplete(): Unit

245

def sendError(cause: Throwable): Unit

246

def expectRequest(): Long

247

def expectRequest(n: Long): Unit

248

def pending: Long

249

}

250

}

251

252

abstract class TestSubscriber {

253

abstract class ManualProbe[I] extends Subscriber[I] {

254

def expectSubscription(): Subscription

255

def expectNext(): I

256

def expectNext(element: I): Unit

257

def expectNext(d: FiniteDuration, element: I): Unit

258

def expectNext(e1: I, e2: I, es: I*): Unit

259

def expectNextN(n: Long): immutable.Seq[I]

260

def expectNextN(all: immutable.Seq[I]): Unit

261

def expectNextUnordered(e1: I, e2: I, es: I*): Unit

262

def expectNextUnorderedN(all: immutable.Seq[I]): Unit

263

def expectNextOrError(): Either[Throwable, I]

264

def expectNextOrComplete(): Either[OnComplete.type, I]

265

def expectComplete(): Unit

266

def expectError(): Throwable

267

def expectError(cause: Throwable): Unit

268

def expectNoMessage(): Unit

269

def expectNoMessage(remaining: FiniteDuration): Unit

270

def request(n: Long): Unit

271

def cancel(): Unit

272

}

273

274

abstract class Probe[I] extends ManualProbe[I] {

275

def request(n: Long): Unit

276

def requestNext(): I

277

def requestNext(element: I): Unit

278

}

279

}

280

```