or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md

stream-sinks.mddocs/

0

# Stream Sinks

1

2

Endpoints for consuming stream elements including collection sinks, side-effect sinks, and integration with external systems. Sinks are the termination points of stream processing pipelines.

3

4

## Capabilities

5

6

### Collection Sinks

7

8

Sinks that collect stream elements into various collection types.

9

10

```scala { .api }

11

object Sink {

12

/**

13

* Collect all elements into an immutable sequence

14

* @return Sink that materializes to Future[immutable.Seq[T]]

15

*/

16

def seq[T]: Sink[T, Future[immutable.Seq[T]]]

17

18

/**

19

* Collect elements into a collection using implicit Factory

20

* @param cbf Factory for creating the collection type

21

* @return Sink that materializes to Future of the collection type

22

*/

23

def collection[T, That](implicit cbf: Factory[T, That with immutable.Iterable[_]]): Sink[T, Future[That]]

24

25

/**

26

* Get only the first element

27

* @return Sink that materializes to Future[T] with the first element

28

*/

29

def head[T]: Sink[T, Future[T]]

30

31

/**

32

* Get the first element if available

33

* @return Sink that materializes to Future[Option[T]]

34

*/

35

def headOption[T]: Sink[T, Future[Option[T]]]

36

37

/**

38

* Get only the last element

39

* @return Sink that materializes to Future[T] with the last element

40

*/

41

def last[T]: Sink[T, Future[T]]

42

43

/**

44

* Get the last element if available

45

* @return Sink that materializes to Future[Option[T]]

46

*/

47

def lastOption[T]: Sink[T, Future[Option[T]]]

48

}

49

```

50

51

**Usage Examples:**

52

53

```scala

54

import akka.stream.scaladsl.{Source, Sink}

55

import scala.concurrent.Future

56

57

// Collect to sequence

58

val seqResult: Future[Seq[Int]] = Source(1 to 10).runWith(Sink.seq)

59

60

// Get first/last elements

61

val firstResult: Future[Int] = Source(1 to 10).runWith(Sink.head)

62

val lastResult: Future[Int] = Source(1 to 10).runWith(Sink.last)

63

64

// Optional first/last

65

val firstOptional: Future[Option[String]] = Source.empty[String].runWith(Sink.headOption)

66

67

// Custom collection

68

val listResult: Future[List[Int]] = Source(1 to 5).runWith(

69

Sink.collection(() => List.newBuilder[Int])

70

)

71

```

72

73

### Aggregation Sinks

74

75

Sinks that perform aggregation operations on stream elements.

76

77

```scala { .api }

78

/**

79

* Fold all elements using an accumulator function

80

* @param zero Initial accumulator value

81

* @param f Function to combine accumulator with each element

82

* @return Sink that materializes to Future with final accumulated value

83

*/

84

def fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]]

85

86

/**

87

* Fold all elements asynchronously

88

* @param zero Initial accumulator value

89

* @param f Async function to combine accumulator with each element

90

* @return Sink that materializes to Future with final accumulated value

91

*/

92

def foldAsync[U, T](zero: U)(f: (U, T) => Future[U]): Sink[T, Future[U]]

93

94

/**

95

* Reduce elements using a binary operator (requires at least one element)

96

* @param f Binary operator for reduction

97

* @return Sink that materializes to Future with reduced value

98

*/

99

def reduce[T](f: (T, T) => T): Sink[T, Future[T]]

100

101

/**

102

* Find the minimum element

103

* @param ord Ordering for comparison

104

* @return Sink that materializes to Future with minimum element

105

*/

106

def min[T](implicit ord: Ordering[T]): Sink[T, Future[T]]

107

108

/**

109

* Find the maximum element

110

* @param ord Ordering for comparison

111

* @return Sink that materializes to Future with maximum element

112

*/

113

def max[T](implicit ord: Ordering[T]): Sink[T, Future[T]]

114

```

115

116

**Usage Examples:**

117

118

```scala

119

// Sum all numbers

120

val sum: Future[Int] = Source(1 to 10).runWith(Sink.fold(0)(_ + _))

121

122

// Concatenate strings

123

val combined: Future[String] = Source(List("Hello", " ", "World"))

124

.runWith(Sink.reduce(_ + _))

125

126

// Find min/max

127

val minimum: Future[Int] = Source(List(5, 2, 8, 1, 9)).runWith(Sink.min)

128

val maximum: Future[Int] = Source(List(5, 2, 8, 1, 9)).runWith(Sink.max)

129

130

// Async aggregation

131

val asyncSum: Future[Int] = Source(1 to 10).runWith(

132

Sink.foldAsync(0) { (acc, elem) =>

133

Future.successful(acc + elem)

134

}

135

)

136

```

137

138

### Side-Effect Sinks

139

140

Sinks that perform side effects without collecting elements.

141

142

```scala { .api }

143

/**

144

* Execute a side effect for each element

145

* @param f Function to execute for each element

146

* @return Sink that materializes to Future[Done] when complete

147

*/

148

def foreach[T](f: T => Unit): Sink[T, Future[Done]]

149

150

/**

151

* Execute an async side effect for each element

152

* @param parallelism Maximum number of concurrent operations

153

* @param f Async function to execute for each element

154

* @return Sink that materializes to Future[Done] when complete

155

*/

156

def foreachAsync[T](parallelism: Int)(f: T => Future[_]): Sink[T, Future[Done]]

157

158

/**

159

* Execute a side effect for each element in parallel

160

* @param parallelism Maximum number of concurrent operations

161

* @param f Function to execute for each element

162

* @return Sink that materializes to Future[Done] when complete

163

*/

164

def foreachParallel[T](parallelism: Int)(f: T => Unit): Sink[T, Future[Done]]

165

166

/**

167

* Ignore all elements

168

* @return Sink that materializes to Future[Done] and discards all elements

169

*/

170

def ignore: Sink[Any, Future[Done]]

171

172

/**

173

* Sink that is immediately cancelled

174

* @return Sink that cancels upstream immediately

175

*/

176

def cancelled[T]: Sink[T, NotUsed]

177

178

/**

179

* Execute function when stream completes or fails

180

* @param callback Function called when stream terminates

181

* @return Sink that materializes to Future[Done]

182

*/

183

def onComplete[T](callback: Try[Done] => Unit): Sink[T, Future[Done]]

184

```

185

186

**Usage Examples:**

187

188

```scala

189

import akka.Done

190

import scala.util.{Success, Failure}

191

192

// Print each element

193

Source(1 to 5).runWith(Sink.foreach(println))

194

195

// Async processing

196

Source(List("url1", "url2", "url3")).runWith(

197

Sink.foreachAsync(2) { url =>

198

// Simulate async HTTP call

199

Future {

200

println(s"Processing $url")

201

Thread.sleep(100)

202

}

203

}

204

)

205

206

// Ignore all elements (useful for testing)

207

Source(1 to 100).runWith(Sink.ignore)

208

209

// Handle completion

210

Source(1 to 5).runWith(Sink.onComplete {

211

case Success(Done) => println("Stream completed successfully")

212

case Failure(ex) => println(s"Stream failed: $ex")

213

})

214

```

215

216

### Actor Integration Sinks

217

218

Sinks that integrate with Akka actors for sending elements as messages.

219

220

```scala { .api }

221

/**

222

* Send elements as messages to an actor

223

* @param ref Target actor reference

224

* @return Sink that sends each element as a message

225

*/

226

def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed]

227

228

/**

229

* Send elements to an actor with backpressure support

230

* @param ref Target actor reference

231

* @param messageAdapter Function to wrap elements in messages

232

* @param initMessage Optional initialization message

233

* @param ackMessage Message that actor sends to acknowledge receipt

234

* @param onCompleteMessage Message sent when stream completes

235

* @param onFailureMessage Function to create failure message

236

* @return Sink with backpressure control

237

*/

238

def actorRefWithBackpressure[T](

239

ref: ActorRef,

240

messageAdapter: T => Any,

241

initMessage: Option[Any] = None,

242

ackMessage: Any,

243

onCompleteMessage: Any,

244

onFailureMessage: Throwable => Any = Status.Failure(_)

245

): Sink[T, NotUsed]

246

```

247

248

**Usage Examples:**

249

250

```scala

251

import akka.actor.{ActorRef, ActorSystem, Props}

252

253

// Simple actor sink

254

val actorRef: ActorRef = system.actorOf(Props[ProcessingActor])

255

Source(1 to 10).runWith(Sink.actorRef(actorRef, "complete"))

256

257

// Backpressure-aware actor sink

258

Source(1 to 100).runWith(

259

Sink.actorRefWithBackpressure(

260

ref = actorRef,

261

messageAdapter = (elem: Int) => ProcessElement(elem),

262

ackMessage = "ack",

263

onCompleteMessage = "complete"

264

)

265

)

266

```

267

268

### Queue Sinks

269

270

Sinks that provide dynamic pull-based consumption.

271

272

```scala { .api }

273

/**

274

* Create a sink that materializes to a queue for pulling elements

275

* @return Sink that materializes to SinkQueue for pulling elements on demand

276

*/

277

def queue[T](): Sink[T, SinkQueue[T]]

278

279

/**

280

* Interface for pulling elements from a queue-backed sink

281

*/

282

trait SinkQueue[T] {

283

/**

284

* Pull the next element from the stream

285

* @return Future with the next element or completion/failure

286

*/

287

def pull(): Future[Option[T]]

288

289

/**

290

* Cancel the sink and complete the stream

291

*/

292

def cancel(): Unit

293

}

294

```

295

296

**Usage Examples:**

297

298

```scala

299

import akka.stream.SinkQueue

300

301

// Pull-based consumption

302

val (queue: SinkQueue[Int], source: Source[Int, NotUsed]) =

303

Source(1 to 10)

304

.toMat(Sink.queue())(Keep.both)

305

.preMaterialize()

306

307

// Pull elements on demand

308

def pullNext(): Unit = {

309

queue.pull().foreach {

310

case Some(element) =>

311

println(s"Got element: $element")

312

pullNext() // Pull next element

313

case None =>

314

println("Stream completed")

315

}

316

}

317

pullNext()

318

```

319

320

### File and IO Sinks

321

322

Sinks for writing to files and other IO destinations.

323

324

```scala { .api }

325

object FileIO {

326

/**

327

* Write ByteString elements to a file

328

* @param f Path to the target file

329

* @param options File open options

330

* @return Sink that materializes to Future[IOResult]

331

*/

332

def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]]

333

}

334

335

/**

336

* Result of IO operations containing byte count and completion status

337

*/

338

final case class IOResult(count: Long, status: Try[Done])

339

```

340

341

**Usage Examples:**

342

343

```scala

344

import akka.stream.scaladsl.FileIO

345

import akka.util.ByteString

346

import java.nio.file.Paths

347

348

// Write to file

349

val filePath = Paths.get("output.txt")

350

Source(List("Hello", "World", "!"))

351

.map(s => ByteString(s + "\n"))

352

.runWith(FileIO.toPath(filePath))

353

.map { result =>

354

println(s"Written ${result.count} bytes")

355

}

356

```

357

358

### Custom and Transformation Sinks

359

360

Operations for creating custom sinks and transforming existing ones.

361

362

```scala { .api }

363

/**

364

* Transform the input type of a sink

365

* @param f Function to transform input elements

366

* @return Sink that accepts transformed input type

367

*/

368

def contramap[In2](f: In2 => In): Sink[In2, Mat]

369

370

/**

371

* Transform the materialized value of a sink

372

* @param f Function to transform materialized value

373

* @return Sink with transformed materialized value

374

*/

375

def mapMaterializedValue[Mat2](f: Mat => Mat2): Sink[In, Mat2]

376

377

/**

378

* Pre-materialize a sink to get both the materialized value and a new sink

379

* @return Tuple of materialized value and equivalent sink

380

*/

381

def preMaterialize()(implicit materializer: Materializer): (Mat, Sink[In, NotUsed])

382

383

/**

384

* Add attributes to a sink

385

* @param attrs Attributes to add

386

* @return Sink with added attributes

387

*/

388

def withAttributes(attrs: Attributes): Sink[In, Mat]

389

```

390

391

**Usage Examples:**

392

393

```scala

394

// Transform input type

395

val intSink: Sink[Int, Future[Seq[Int]]] = Sink.seq[Int]

396

val stringSink: Sink[String, Future[Seq[Int]]] = intSink.contramap(_.toInt)

397

398

// Transform materialized value

399

val countSink: Sink[String, Future[Int]] = Sink.seq[String]

400

.mapMaterializedValue(_.map(_.length))

401

402

// Pre-materialize for reuse

403

val (future: Future[Seq[Int]], reusableSink: Sink[Int, NotUsed]) =

404

Sink.seq[Int].preMaterialize()

405

```

406

407

## Types

408

409

```scala { .api }

410

// Queue interface for pull-based consumption

411

trait SinkQueue[T] {

412

def pull(): Future[Option[T]]

413

def cancel(): Unit

414

}

415

416

// IO operation result

417

final case class IOResult(count: Long, status: Try[Done]) {

418

def wasSuccessful: Boolean = status.isSuccess

419

}

420

421

// Completion marker

422

sealed abstract class Done

423

case object Done extends Done

424

```