0
# Stream Sinks
1
2
Endpoints for consuming stream elements including collection sinks, side-effect sinks, and integration with external systems. Sinks are the termination points of stream processing pipelines.
3
4
## Capabilities
5
6
### Collection Sinks
7
8
Sinks that collect stream elements into various collection types.
9
10
```scala { .api }
11
object Sink {
12
/**
13
* Collect all elements into an immutable sequence
14
* @return Sink that materializes to Future[immutable.Seq[T]]
15
*/
16
def seq[T]: Sink[T, Future[immutable.Seq[T]]]
17
18
/**
19
* Collect elements into a collection using implicit Factory
20
* @param cbf Factory for creating the collection type
21
* @return Sink that materializes to Future of the collection type
22
*/
23
def collection[T, That](implicit cbf: Factory[T, That with immutable.Iterable[_]]): Sink[T, Future[That]]
24
25
/**
26
* Get only the first element
27
* @return Sink that materializes to Future[T] with the first element
28
*/
29
def head[T]: Sink[T, Future[T]]
30
31
/**
32
* Get the first element if available
33
* @return Sink that materializes to Future[Option[T]]
34
*/
35
def headOption[T]: Sink[T, Future[Option[T]]]
36
37
/**
38
* Get only the last element
39
* @return Sink that materializes to Future[T] with the last element
40
*/
41
def last[T]: Sink[T, Future[T]]
42
43
/**
44
* Get the last element if available
45
* @return Sink that materializes to Future[Option[T]]
46
*/
47
def lastOption[T]: Sink[T, Future[Option[T]]]
48
}
49
```
50
51
**Usage Examples:**
52
53
```scala
54
import akka.stream.scaladsl.{Source, Sink}
55
import scala.concurrent.Future
56
57
// Collect to sequence
58
val seqResult: Future[Seq[Int]] = Source(1 to 10).runWith(Sink.seq)
59
60
// Get first/last elements
61
val firstResult: Future[Int] = Source(1 to 10).runWith(Sink.head)
62
val lastResult: Future[Int] = Source(1 to 10).runWith(Sink.last)
63
64
// Optional first/last
65
val firstOptional: Future[Option[String]] = Source.empty[String].runWith(Sink.headOption)
66
67
// Custom collection
68
val listResult: Future[List[Int]] = Source(1 to 5).runWith(
69
Sink.collection(() => List.newBuilder[Int])
70
)
71
```
72
73
### Aggregation Sinks
74
75
Sinks that perform aggregation operations on stream elements.
76
77
```scala { .api }
78
/**
79
* Fold all elements using an accumulator function
80
* @param zero Initial accumulator value
81
* @param f Function to combine accumulator with each element
82
* @return Sink that materializes to Future with final accumulated value
83
*/
84
def fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]]
85
86
/**
87
* Fold all elements asynchronously
88
* @param zero Initial accumulator value
89
* @param f Async function to combine accumulator with each element
90
* @return Sink that materializes to Future with final accumulated value
91
*/
92
def foldAsync[U, T](zero: U)(f: (U, T) => Future[U]): Sink[T, Future[U]]
93
94
/**
95
* Reduce elements using a binary operator (requires at least one element)
96
* @param f Binary operator for reduction
97
* @return Sink that materializes to Future with reduced value
98
*/
99
def reduce[T](f: (T, T) => T): Sink[T, Future[T]]
100
101
/**
102
* Find the minimum element
103
* @param ord Ordering for comparison
104
* @return Sink that materializes to Future with minimum element
105
*/
106
def min[T](implicit ord: Ordering[T]): Sink[T, Future[T]]
107
108
/**
109
* Find the maximum element
110
* @param ord Ordering for comparison
111
* @return Sink that materializes to Future with maximum element
112
*/
113
def max[T](implicit ord: Ordering[T]): Sink[T, Future[T]]
114
```
115
116
**Usage Examples:**
117
118
```scala
119
// Sum all numbers
120
val sum: Future[Int] = Source(1 to 10).runWith(Sink.fold(0)(_ + _))
121
122
// Concatenate strings
123
val combined: Future[String] = Source(List("Hello", " ", "World"))
124
.runWith(Sink.reduce(_ + _))
125
126
// Find min/max
127
val minimum: Future[Int] = Source(List(5, 2, 8, 1, 9)).runWith(Sink.min)
128
val maximum: Future[Int] = Source(List(5, 2, 8, 1, 9)).runWith(Sink.max)
129
130
// Async aggregation
131
val asyncSum: Future[Int] = Source(1 to 10).runWith(
132
Sink.foldAsync(0) { (acc, elem) =>
133
Future.successful(acc + elem)
134
}
135
)
136
```
137
138
### Side-Effect Sinks
139
140
Sinks that perform side effects without collecting elements.
141
142
```scala { .api }
143
/**
144
* Execute a side effect for each element
145
* @param f Function to execute for each element
146
* @return Sink that materializes to Future[Done] when complete
147
*/
148
def foreach[T](f: T => Unit): Sink[T, Future[Done]]
149
150
/**
151
* Execute an async side effect for each element
152
* @param parallelism Maximum number of concurrent operations
153
* @param f Async function to execute for each element
154
* @return Sink that materializes to Future[Done] when complete
155
*/
156
def foreachAsync[T](parallelism: Int)(f: T => Future[_]): Sink[T, Future[Done]]
157
158
/**
159
* Execute a side effect for each element in parallel
160
* @param parallelism Maximum number of concurrent operations
161
* @param f Function to execute for each element
162
* @return Sink that materializes to Future[Done] when complete
163
*/
164
def foreachParallel[T](parallelism: Int)(f: T => Unit): Sink[T, Future[Done]]
165
166
/**
167
* Ignore all elements
168
* @return Sink that materializes to Future[Done] and discards all elements
169
*/
170
def ignore: Sink[Any, Future[Done]]
171
172
/**
173
* Sink that is immediately cancelled
174
* @return Sink that cancels upstream immediately
175
*/
176
def cancelled[T]: Sink[T, NotUsed]
177
178
/**
179
* Execute function when stream completes or fails
180
* @param callback Function called when stream terminates
181
* @return Sink that materializes to Future[Done]
182
*/
183
def onComplete[T](callback: Try[Done] => Unit): Sink[T, Future[Done]]
184
```
185
186
**Usage Examples:**
187
188
```scala
189
import akka.Done
190
import scala.util.{Success, Failure}
191
192
// Print each element
193
Source(1 to 5).runWith(Sink.foreach(println))
194
195
// Async processing
196
Source(List("url1", "url2", "url3")).runWith(
197
Sink.foreachAsync(2) { url =>
198
// Simulate async HTTP call
199
Future {
200
println(s"Processing $url")
201
Thread.sleep(100)
202
}
203
}
204
)
205
206
// Ignore all elements (useful for testing)
207
Source(1 to 100).runWith(Sink.ignore)
208
209
// Handle completion
210
Source(1 to 5).runWith(Sink.onComplete {
211
case Success(Done) => println("Stream completed successfully")
212
case Failure(ex) => println(s"Stream failed: $ex")
213
})
214
```
215
216
### Actor Integration Sinks
217
218
Sinks that integrate with Akka actors for sending elements as messages.
219
220
```scala { .api }
221
/**
222
* Send elements as messages to an actor
223
* @param ref Target actor reference
224
* @return Sink that sends each element as a message
225
*/
226
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed]
227
228
/**
229
* Send elements to an actor with backpressure support
230
* @param ref Target actor reference
231
* @param messageAdapter Function to wrap elements in messages
232
* @param initMessage Optional initialization message
233
* @param ackMessage Message that actor sends to acknowledge receipt
234
* @param onCompleteMessage Message sent when stream completes
235
* @param onFailureMessage Function to create failure message
236
* @return Sink with backpressure control
237
*/
238
def actorRefWithBackpressure[T](
239
ref: ActorRef,
240
messageAdapter: T => Any,
241
initMessage: Option[Any] = None,
242
ackMessage: Any,
243
onCompleteMessage: Any,
244
onFailureMessage: Throwable => Any = Status.Failure(_)
245
): Sink[T, NotUsed]
246
```
247
248
**Usage Examples:**
249
250
```scala
251
import akka.actor.{ActorRef, ActorSystem, Props}
252
253
// Simple actor sink
254
val actorRef: ActorRef = system.actorOf(Props[ProcessingActor])
255
Source(1 to 10).runWith(Sink.actorRef(actorRef, "complete"))
256
257
// Backpressure-aware actor sink
258
Source(1 to 100).runWith(
259
Sink.actorRefWithBackpressure(
260
ref = actorRef,
261
messageAdapter = (elem: Int) => ProcessElement(elem),
262
ackMessage = "ack",
263
onCompleteMessage = "complete"
264
)
265
)
266
```
267
268
### Queue Sinks
269
270
Sinks that provide dynamic pull-based consumption.
271
272
```scala { .api }
273
/**
274
* Create a sink that materializes to a queue for pulling elements
275
* @return Sink that materializes to SinkQueue for pulling elements on demand
276
*/
277
def queue[T](): Sink[T, SinkQueue[T]]
278
279
/**
280
* Interface for pulling elements from a queue-backed sink
281
*/
282
trait SinkQueue[T] {
283
/**
284
* Pull the next element from the stream
285
* @return Future with the next element or completion/failure
286
*/
287
def pull(): Future[Option[T]]
288
289
/**
290
* Cancel the sink and complete the stream
291
*/
292
def cancel(): Unit
293
}
294
```
295
296
**Usage Examples:**
297
298
```scala
299
import akka.stream.SinkQueue
300
301
// Pull-based consumption
302
val (queue: SinkQueue[Int], source: Source[Int, NotUsed]) =
303
Source(1 to 10)
304
.toMat(Sink.queue())(Keep.both)
305
.preMaterialize()
306
307
// Pull elements on demand
308
def pullNext(): Unit = {
309
queue.pull().foreach {
310
case Some(element) =>
311
println(s"Got element: $element")
312
pullNext() // Pull next element
313
case None =>
314
println("Stream completed")
315
}
316
}
317
pullNext()
318
```
319
320
### File and IO Sinks
321
322
Sinks for writing to files and other IO destinations.
323
324
```scala { .api }
325
object FileIO {
326
/**
327
* Write ByteString elements to a file
328
* @param f Path to the target file
329
* @param options File open options
330
* @return Sink that materializes to Future[IOResult]
331
*/
332
def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]]
333
}
334
335
/**
336
* Result of IO operations containing byte count and completion status
337
*/
338
final case class IOResult(count: Long, status: Try[Done])
339
```
340
341
**Usage Examples:**
342
343
```scala
344
import akka.stream.scaladsl.FileIO
345
import akka.util.ByteString
346
import java.nio.file.Paths
347
348
// Write to file
349
val filePath = Paths.get("output.txt")
350
Source(List("Hello", "World", "!"))
351
.map(s => ByteString(s + "\n"))
352
.runWith(FileIO.toPath(filePath))
353
.map { result =>
354
println(s"Written ${result.count} bytes")
355
}
356
```
357
358
### Custom and Transformation Sinks
359
360
Operations for creating custom sinks and transforming existing ones.
361
362
```scala { .api }
363
/**
364
* Transform the input type of a sink
365
* @param f Function to transform input elements
366
* @return Sink that accepts transformed input type
367
*/
368
def contramap[In2](f: In2 => In): Sink[In2, Mat]
369
370
/**
371
* Transform the materialized value of a sink
372
* @param f Function to transform materialized value
373
* @return Sink with transformed materialized value
374
*/
375
def mapMaterializedValue[Mat2](f: Mat => Mat2): Sink[In, Mat2]
376
377
/**
378
* Pre-materialize a sink to get both the materialized value and a new sink
379
* @return Tuple of materialized value and equivalent sink
380
*/
381
def preMaterialize()(implicit materializer: Materializer): (Mat, Sink[In, NotUsed])
382
383
/**
384
* Add attributes to a sink
385
* @param attrs Attributes to add
386
* @return Sink with added attributes
387
*/
388
def withAttributes(attrs: Attributes): Sink[In, Mat]
389
```
390
391
**Usage Examples:**
392
393
```scala
394
// Transform input type
395
val intSink: Sink[Int, Future[Seq[Int]]] = Sink.seq[Int]
396
val stringSink: Sink[String, Future[Seq[Int]]] = intSink.contramap(_.toInt)
397
398
// Transform materialized value
399
val countSink: Sink[String, Future[Int]] = Sink.seq[String]
400
.mapMaterializedValue(_.map(_.length))
401
402
// Pre-materialize for reuse
403
val (future: Future[Seq[Int]], reusableSink: Sink[Int, NotUsed]) =
404
Sink.seq[Int].preMaterialize()
405
```
406
407
## Types
408
409
```scala { .api }
410
// Queue interface for pull-based consumption
411
trait SinkQueue[T] {
412
def pull(): Future[Option[T]]
413
def cancel(): Unit
414
}
415
416
// IO operation result
417
final case class IOResult(count: Long, status: Try[Done]) {
418
def wasSuccessful: Boolean = status.isSuccess
419
}
420
421
// Completion marker
422
sealed abstract class Done
423
case object Done extends Done
424
```