0
# Akka Stream
1
2
Akka Stream is a powerful reactive streaming library built on top of the Akka actor framework that implements the Reactive Streams specification. It provides a high-level DSL for building resilient, distributed, and concurrent stream processing applications with strong back-pressure support and composable stream processing operators.
3
4
## Package Information
5
6
- **Package Name**: com.typesafe.akka:akka-stream_2.12
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**:
10
- sbt: `libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.8.8"`
11
- Maven: `<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-stream_2.12</artifactId><version>2.8.8</version></dependency>`
12
13
## Core Imports
14
15
### Scala DSL
16
17
```scala
18
import akka.stream.scaladsl.{Source, Flow, Sink, RunnableGraph}
19
import akka.stream.Materializer
20
import akka.NotUsed
21
```
22
23
### Java DSL
24
25
```java
26
import akka.stream.javadsl.Source;
27
import akka.stream.javadsl.Flow;
28
import akka.stream.javadsl.Sink;
29
import akka.stream.Materializer;
30
```
31
32
## Basic Usage
33
34
```scala
35
import akka.actor.ActorSystem
36
import akka.stream.scaladsl.{Source, Sink}
37
import akka.stream.Materializer
38
39
implicit val system: ActorSystem = ActorSystem("stream-system")
40
implicit val materializer: Materializer = Materializer(system)
41
42
// Create a source of numbers 1 to 10
43
val source = Source(1 to 10)
44
45
// Transform and process the stream
46
val result = source
47
.map(_ * 2)
48
.filter(_ > 10)
49
.runWith(Sink.seq)
50
51
// result is a Future[Seq[Int]] containing [12, 14, 16, 18, 20]
52
```
53
54
## Architecture
55
56
Akka Stream is built around several key architectural concepts:
57
58
- **Reactive Streams**: Full compliance with the Reactive Streams specification for asynchronous stream processing with back-pressure
59
- **Graph DSL**: High-level declarative API for describing stream processing topologies as graphs with sources, flows, and sinks
60
- **Materialization**: Two-phase execution where stream blueprints are first defined, then materialized into running streams
61
- **Back-pressure**: Automatic flow control preventing overwhelming of downstream components
62
- **Actor Integration**: Built on Akka actors for distribution, fault tolerance, and scalability
63
- **Type Safety**: Strong typing throughout the stream processing pipeline with Scala's type system
64
65
## Capabilities
66
67
### Core Stream Types
68
69
The fundamental building blocks for creating and composing stream processing pipelines. These types define the structure and flow of data through reactive streams.
70
71
```scala { .api }
72
// Source: Stream with one output, no inputs
73
trait Source[+Out, +Mat] extends Graph[SourceShape[Out], Mat]
74
75
// Flow: Stream processing step with one input and one output
76
trait Flow[-In, +Out, +Mat] extends Graph[FlowShape[In, Out], Mat]
77
78
// Sink: Stream endpoint that consumes elements
79
trait Sink[-In, +Mat] extends Graph[SinkShape[In], Mat]
80
81
// RunnableGraph: Complete stream ready for execution
82
trait RunnableGraph[+Mat] extends Graph[ClosedShape, Mat]
83
```
84
85
[Core Stream Types](./core-stream-types.md)
86
87
### Stream Sources
88
89
Factory methods and utilities for creating stream sources from various data sources including collections, futures, actors, and external systems.
90
91
```scala { .api }
92
object Source {
93
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]
94
def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed]
95
def future[T](futureElement: Future[T]): Source[T, NotUsed]
96
def single[T](element: T): Source[T, NotUsed]
97
def empty[T]: Source[T, NotUsed]
98
def repeat[T](element: T): Source[T, NotUsed]
99
}
100
```
101
102
[Stream Sources](./stream-sources.md)
103
104
### Stream Transformations
105
106
Core transformation operators for manipulating, filtering, grouping, and routing stream elements with strong type safety and back-pressure support.
107
108
```scala { .api }
109
// Basic transformations
110
def map[T2](f: Out => T2): Source[T2, Mat]
111
def filter(p: Out => Boolean): Source[Out, Mat]
112
def collect[T2](pf: PartialFunction[Out, T2]): Source[T2, Mat]
113
114
// Async transformations
115
def mapAsync[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]
116
def mapAsyncUnordered[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]
117
118
// Grouping and batching
119
def grouped(n: Int): Source[immutable.Seq[Out], Mat]
120
def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Source[Out, Mat]#Repr, RunnableGraph[Mat]]
121
```
122
123
[Stream Transformations](./stream-transformations.md)
124
125
### Stream Combining
126
127
Operations for merging, zipping, concatenating, and broadcasting streams to create complex data flow topologies.
128
129
```scala { .api }
130
// Combining sources
131
def merge[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]
132
def concat[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]
133
def zip[U](other: Graph[SourceShape[U], _]): Source[(Out, U), Mat]
134
135
// Broadcasting and balancing
136
def broadcast(outputCount: Int): Graph[UniformFanOutShape[Out, Out], NotUsed]
137
def balance[T](outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed]
138
```
139
140
[Stream Combining](./stream-combining.md)
141
142
### Stream Sinks
143
144
Endpoints for consuming stream elements including collection sinks, side-effect sinks, and integration with external systems.
145
146
```scala { .api }
147
object Sink {
148
def seq[T]: Sink[T, Future[immutable.Seq[T]]]
149
def head[T]: Sink[T, Future[T]]
150
def foreach[T](f: T => Unit): Sink[T, Future[Done]]
151
def fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]]
152
def ignore: Sink[Any, Future[Done]]
153
}
154
```
155
156
[Stream Sinks](./stream-sinks.md)
157
158
### Materialization and Execution
159
160
System for converting stream blueprints into running streams, managing resources, and controlling materialized values.
161
162
```scala { .api }
163
trait Materializer {
164
def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
165
def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable
166
}
167
168
// Materialization control
169
def run(): Mat
170
def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2]): Mat2
171
def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]
172
```
173
174
[Materialization and Execution](./materialization.md)
175
176
### Error Handling and Supervision
177
178
Strategies for handling failures, implementing supervision, and recovering from errors in stream processing pipelines.
179
180
```scala { .api }
181
object Supervision {
182
type Decider = Throwable => Directive
183
184
sealed abstract class Directive
185
case object Resume extends Directive
186
case object Restart extends Directive
187
case object Stop extends Directive
188
}
189
190
// Error handling operators
191
def recover[U >: Out](pf: PartialFunction[Throwable, U]): Source[U, Mat]
192
def recoverWithRetries[U >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[U], _]]): Source[U, Mat]
193
```
194
195
[Error Handling](./error-handling.md)
196
197
### Stream Control and Lifecycle
198
199
Mechanisms for controlling stream lifecycle, implementing backpressure, rate limiting, and external stream termination.
200
201
```scala { .api }
202
// Flow control
203
def buffer(size: Int, overflowStrategy: OverflowStrategy): Source[Out, Mat]
204
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Source[Out, Mat]
205
206
// Kill switches for external termination
207
trait KillSwitch {
208
def shutdown(): Unit
209
def abort(ex: Throwable): Unit
210
}
211
```
212
213
[Stream Control](./stream-control.md)
214
215
### Custom Stages
216
217
API for creating custom stream processing operators using GraphStage for advanced use cases requiring fine-grained control over stream behavior.
218
219
```scala { .api }
220
abstract class GraphStage[S <: Shape] extends Graph[S, NotUsed] {
221
def createLogic(inheritedAttributes: Attributes): GraphStageLogic
222
}
223
224
abstract class GraphStageLogic(val shape: Shape) {
225
def setHandler(in: Inlet[_], handler: InHandler): Unit
226
def setHandler(out: Outlet[_], handler: OutHandler): Unit
227
def push[T](out: Outlet[T], elem: T): Unit
228
def pull[T](in: Inlet[T]): Unit
229
}
230
```
231
232
[Custom Stages](./custom-stages.md)
233
234
### Integration
235
236
Integration with file systems, TCP/TLS networking, actors, and external reactive streams publishers/subscribers.
237
238
```scala { .api }
239
// File I/O
240
object FileIO {
241
def fromPath(f: Path): Source[ByteString, Future[IOResult]]
242
def toPath(f: Path): Sink[ByteString, Future[IOResult]]
243
}
244
245
// TCP networking
246
object Tcp {
247
def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]
248
def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]]
249
}
250
```
251
252
[Integration](./integration.md)
253
254
## Types
255
256
### Core Types
257
258
```scala { .api }
259
// Essential type for operations without materialized value
260
type NotUsed = akka.NotUsed
261
262
// Completion marker
263
sealed abstract class Done
264
case object Done extends Done
265
266
// Stream shapes
267
trait SourceShape[+T] extends Shape
268
trait FlowShape[-I, +O] extends Shape
269
trait SinkShape[-T] extends Shape
270
trait ClosedShape extends Shape
271
272
// Overflow strategies for buffering
273
sealed abstract class OverflowStrategy
274
object OverflowStrategy {
275
case object DropHead extends OverflowStrategy
276
case object DropTail extends OverflowStrategy
277
case object DropBuffer extends OverflowStrategy
278
case object DropNew extends OverflowStrategy
279
case object Backpressure extends OverflowStrategy
280
case object Fail extends OverflowStrategy
281
}
282
```