or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

graph-stage-testing.mdindex.mdstream-lifecycle.mdtest-publishers.mdtest-sources-sinks.mdtest-subscribers.md

test-sources-sinks.mddocs/

0

# Test Sources and Sinks

1

2

Factory methods for creating Source and Sink instances that materialize to test probes. These provide the primary interface for stream testing by creating streams that can be controlled and observed through probe interfaces.

3

4

## Capabilities

5

6

### Test Source (Scala DSL)

7

8

Creates Source instances that materialize to TestPublisher.Probe for controllable upstream testing.

9

10

```scala { .api }

11

object TestSource {

12

/**

13

* Creates a Source that materializes to a TestPublisher.Probe

14

* @param system The actor system provider

15

* @return Source that emits elements of type T and materializes to TestPublisher.Probe[T]

16

*/

17

def apply[T]()(implicit system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]

18

19

/**

20

* Deprecated method - use apply() instead

21

*/

22

@deprecated("Use `TestSource()` with implicit ClassicActorSystemProvider instead.", "2.7.0")

23

def probe[T](implicit system: ActorSystem): Source[T, TestPublisher.Probe[T]]

24

}

25

```

26

27

**Usage Examples:**

28

29

```scala

30

import akka.actor.ActorSystem

31

import akka.stream.scaladsl.{Keep, Sink}

32

import akka.stream.testkit.scaladsl.TestSource

33

34

implicit val system = ActorSystem("test")

35

36

// Create test source and materialize

37

val (probe, future) = TestSource[String]()

38

.toMat(Sink.seq)(Keep.both)

39

.run()

40

41

// Control the source

42

probe.sendNext("hello")

43

probe.sendNext("world")

44

probe.sendComplete()

45

46

// Verify results

47

val result = Await.result(future, 3.seconds)

48

result should contain inOrderOnly ("hello", "world")

49

50

// Pre-materialize for multiple uses

51

val (pub, source) = TestSource[Int]().preMaterialize()

52

pub.sendNext(42)

53

54

val sink1 = source.runWith(Sink.head)

55

val sink2 = source.runWith(Sink.seq)

56

```

57

58

### Test Sink (Scala DSL)

59

60

Creates Sink instances that materialize to TestSubscriber.Probe for controllable downstream testing.

61

62

```scala { .api }

63

object TestSink {

64

/**

65

* Creates a Sink that materializes to a TestSubscriber.Probe

66

* @param system The actor system provider

67

* @return Sink that consumes elements of type T and materializes to TestSubscriber.Probe[T]

68

*/

69

def apply[T]()(implicit system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]

70

71

/**

72

* Deprecated method - use apply() instead

73

*/

74

@deprecated("Use `TestSink()` with implicit ClassicActorSystemProvider instead.", "2.7.0")

75

def probe[T](implicit system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]

76

}

77

```

78

79

**Usage Examples:**

80

81

```scala

82

import akka.stream.scaladsl.Source

83

import akka.stream.testkit.scaladsl.TestSink

84

85

// Create test sink and materialize

86

val probe = Source(1 to 5)

87

.map(_ * 2)

88

.runWith(TestSink[Int]())

89

90

// Verify elements

91

probe.request(5)

92

probe.expectNext(2, 4, 6, 8, 10)

93

probe.expectComplete()

94

95

// Use with Keep.both for bidirectional testing

96

val (sourceProbe, sinkProbe) = TestSource[String]()

97

.map(_.toUpperCase)

98

.toMat(TestSink[String]())(Keep.both)

99

.run()

100

101

sourceProbe.sendNext("hello")

102

sourceProbe.sendComplete()

103

104

sinkProbe.request(1)

105

sinkProbe.expectNext("HELLO")

106

sinkProbe.expectComplete()

107

```

108

109

### Test Source (Java API)

110

111

Java API for creating test sources.

112

113

```scala { .api }

114

object TestSource {

115

/**

116

* Creates a Source that materializes to a TestPublisher.Probe (Java API)

117

*/

118

def create[T](system: ClassicActorSystemProvider): akka.stream.javadsl.Source[T, TestPublisher.Probe[T]]

119

120

/**

121

* Deprecated method - use create() instead

122

*/

123

@deprecated("Use `TestSource.create` with ClassicActorSystemProvider instead.", "2.7.0")

124

def probe[T](system: ActorSystem): akka.stream.javadsl.Source[T, TestPublisher.Probe[T]]

125

}

126

```

127

128

**Usage Examples (Java):**

129

130

```java

131

import akka.actor.ActorSystem;

132

import akka.stream.javadsl.Sink;

133

import akka.stream.testkit.javadsl.TestSource;

134

import akka.japi.Pair;

135

136

ActorSystem system = ActorSystem.create("test");

137

138

// Create test source

139

Pair<TestPublisher.Probe<String>, CompletionStage<List<String>>> pair =

140

TestSource.<String>create(system)

141

.toMat(Sink.seq(), Keep.both())

142

.run(system);

143

144

TestPublisher.Probe<String> probe = pair.first();

145

CompletionStage<List<String>> future = pair.second();

146

147

// Control the source

148

probe.sendNext("hello");

149

probe.sendNext("world");

150

probe.sendComplete();

151

152

// Verify results

153

List<String> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);

154

assertEquals(Arrays.asList("hello", "world"), result);

155

```

156

157

### Test Sink (Java API)

158

159

Java API for creating test sinks.

160

161

```scala { .api }

162

object TestSink {

163

/**

164

* Creates a Sink that materializes to a TestSubscriber.Probe (Java API)

165

*/

166

def create[T](system: ClassicActorSystemProvider): akka.stream.javadsl.Sink[T, TestSubscriber.Probe[T]]

167

168

/**

169

* Deprecated method - use create() instead

170

*/

171

@deprecated("Use `TestSink.create` with ClassicActorSystemProvider instead.", "2.7.0")

172

def probe[T](system: ActorSystem): akka.stream.javadsl.Sink[T, TestSubscriber.Probe[T]]

173

}

174

```

175

176

**Usage Examples (Java):**

177

178

```java

179

import akka.stream.javadsl.Source;

180

import akka.stream.testkit.javadsl.TestSink;

181

182

// Create test sink

183

TestSubscriber.Probe<Integer> probe =

184

Source.range(1, 5)

185

.map(x -> x * 2)

186

.runWith(TestSink.create(system), system);

187

188

// Verify elements

189

probe.request(5);

190

probe.expectNext(2, 4, 6, 8, 10);

191

probe.expectComplete();

192

```

193

194

## Common Testing Patterns

195

196

### End-to-End Stream Testing

197

198

```scala

199

import akka.stream.scaladsl.Flow

200

201

val (sourceProbe, sinkProbe) = TestSource[String]()

202

.via(Flow[String].map(_.toUpperCase).filter(_.length > 3))

203

.toMat(TestSink[String]())(Keep.both)

204

.run()

205

206

// Test the flow

207

sourceProbe.sendNext("hi") // Should be filtered out

208

sourceProbe.sendNext("hello") // Should pass through

209

sourceProbe.sendNext("world") // Should pass through

210

sourceProbe.sendComplete()

211

212

sinkProbe.request(10)

213

sinkProbe.expectNext("HELLO", "WORLD")

214

sinkProbe.expectComplete()

215

```

216

217

### Testing Backpressure Propagation

218

219

```scala

220

val (sourceProbe, sinkProbe) = TestSource[Int]()

221

.toMat(TestSink[Int]())(Keep.both)

222

.run()

223

224

// Don't request from sink - should not be able to send from source

225

sourceProbe.sendNext(1)

226

sourceProbe.expectRequest() // Will block until downstream requests

227

228

// Now request and verify element flows

229

sinkProbe.request(1)

230

// sourceProbe.expectRequest() returns now

231

sinkProbe.expectNext(1)

232

```

233

234

### Testing Error Propagation

235

236

```scala

237

val (sourceProbe, sinkProbe) = TestSource[String]()

238

.map(s => if (s == "error") throw new RuntimeException("test") else s)

239

.toMat(TestSink[String]())(Keep.both)

240

.run()

241

242

sourceProbe.sendNext("ok")

243

sourceProbe.sendNext("error")

244

245

sinkProbe.request(2)

246

sinkProbe.expectNext("ok")

247

val error = sinkProbe.expectError()

248

error.getMessage should be("test")

249

```

250

251

### Testing Stream Completion

252

253

```scala

254

val (sourceProbe, sinkProbe) = TestSource[String]()

255

.toMat(TestSink[String]())(Keep.both)

256

.run()

257

258

sourceProbe.sendNext("last")

259

sourceProbe.sendComplete()

260

261

sinkProbe.request(1)

262

sinkProbe.expectNext("last")

263

sinkProbe.expectComplete()

264

```

265

266

### Testing Materialize Once, Use Multiple Times

267

268

```scala

269

// Pre-materialize for reuse

270

val (sourceProbe, source) = TestSource[Int]().preMaterialize()

271

272

// Use with multiple sinks

273

val sink1 = source.runWith(Sink.head)

274

val sink2 = source.runWith(Sink.seq)

275

val sink3 = source.runWith(TestSink[Int]())

276

277

// Control single source, affects all sinks

278

sourceProbe.sendNext(42)

279

sourceProbe.sendComplete()

280

281

// Verify all sinks received the element

282

Await.result(sink1, 1.second) should be(42)

283

sink3.request(1)

284

sink3.expectNext(42)

285

sink3.expectComplete()

286

```

287

288

### Testing Complex Flows with Multiple Stages

289

290

```scala

291

val complexFlow = Flow[String]

292

.map(_.toLowerCase)

293

.filter(_.nonEmpty)

294

.groupedWithin(3, 1.second)

295

.map(_.mkString(","))

296

297

val (sourceProbe, sinkProbe) = TestSource[String]()

298

.via(complexFlow)

299

.toMat(TestSink[String]())(Keep.both)

300

.run()

301

302

// Send test data

303

sourceProbe.sendNext("HELLO")

304

sourceProbe.sendNext("") // Will be filtered out

305

sourceProbe.sendNext("WORLD")

306

sourceProbe.sendNext("TEST")

307

sourceProbe.sendComplete()

308

309

// Verify grouped output

310

sinkProbe.request(2)

311

sinkProbe.expectNext("hello,world,test") // Grouped and joined

312

sinkProbe.expectComplete()

313

```