Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5@2.5.00
# Akka Streams
1
2
Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space. This library provides a domain-specific language for expressing complex data transformation pipelines with automatic backpressure management, enabling developers to build robust, asynchronous stream processing applications.
3
4
## Package Information
5
6
- **Package Name**: com.typesafe.akka:akka-stream_2.13.0-M5
7
- **Package Type**: maven
8
- **Language**: Scala (with Java API)
9
- **Version**: 2.5.23
10
- **Installation**: Add to `build.sbt`: `libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.23"`
11
12
## Core Imports
13
14
```scala
15
import akka.stream.scaladsl.{Source, Flow, Sink}
16
import akka.stream.{ActorMaterializer, Materializer}
17
import akka.actor.ActorSystem
18
import akka.{Done, NotUsed}
19
```
20
21
Java API:
22
```java
23
import akka.stream.javadsl.*;
24
import akka.stream.ActorMaterializer;
25
import akka.stream.Materializer;
26
import akka.actor.ActorSystem;
27
```
28
29
## Basic Usage
30
31
```scala
32
import akka.actor.ActorSystem
33
import akka.stream.ActorMaterializer
34
import akka.stream.scaladsl.{Source, Sink}
35
36
implicit val system = ActorSystem("MySystem")
37
implicit val materializer: ActorMaterializer = ActorMaterializer()
38
39
// Create a simple stream: numbers 1 to 10, multiply by 2, print results
40
val source: Source[Int, NotUsed] = Source(1 to 10)
41
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
42
43
val result: Future[Done] = source
44
.map(_ * 2)
45
.runWith(sink)
46
47
// Result will print: 2, 4, 6, 8, 10, 12, 14, 16, 18, 20
48
```
49
50
## Architecture
51
52
Akka Streams is built around several key abstractions:
53
54
- **Graph**: The blueprint of a stream processing topology
55
- **Shape**: Defines the inlets and outlets of a graph component
56
- **Materializer**: Responsible for turning graph blueprints into running streams
57
- **Source**: Stream component with one output (data producer)
58
- **Flow**: Stream component with one input and one output (data transformer)
59
- **Sink**: Stream component with one input (data consumer)
60
61
All stream processing is built using these composable, type-safe building blocks that automatically handle backpressure according to the Reactive Streams specification.
62
63
## Capabilities
64
65
### Core Stream Components
66
67
The fundamental building blocks for creating reactive streams with type-safe composition and automatic backpressure handling.
68
69
```scala { .api }
70
// Source - produces elements
71
final class Source[+Out, +Mat](traversalBuilder: LinearTraversalBuilder, shape: SourceShape[Out])
72
73
// Flow - transforms elements
74
final class Flow[-In, +Out, +Mat](traversalBuilder: LinearTraversalBuilder, shape: FlowShape[In, Out])
75
76
// Sink - consumes elements
77
final class Sink[-In, +Mat](traversalBuilder: LinearTraversalBuilder, shape: SinkShape[In])
78
```
79
80
[Core Stream Components](./core-components.md)
81
82
### Graph Building and Composition
83
84
Advanced graph construction using GraphDSL for complex stream topologies including fan-in, fan-out, and custom shapes.
85
86
```scala { .api }
87
object GraphDSL {
88
def create[S <: Shape, Mat](buildBlock: GraphDSL.Builder[Mat] => S): Graph[S, Mat]
89
90
class Builder[+M] {
91
def add[S <: Shape](graph: Graph[S, _]): S
92
// Connection operators: ~>, <~, via, to, from
93
}
94
}
95
```
96
97
[Graph Building and Composition](./graph-building.md)
98
99
### Stream Operations and Transformations
100
101
Comprehensive set of stream processing operations including mapping, filtering, grouping, timing, and error handling.
102
103
```scala { .api }
104
trait FlowOps[+Out, +Mat] {
105
def map[T](f: Out => T): Repr[T]
106
def filter(p: Out => Boolean): Repr[Out]
107
def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
108
def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Repr, Closed]
109
def throttle(elements: Int, per: FiniteDuration): Repr[Out]
110
}
111
```
112
113
[Stream Operations and Transformations](./stream-operations.md)
114
115
### Materialization and Execution
116
117
Stream materialization with ActorMaterializer, lifecycle management, and execution control.
118
119
```scala { .api }
120
abstract class Materializer {
121
def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
122
def withNamePrefix(name: String): Materializer
123
}
124
125
object ActorMaterializer {
126
def apply()(implicit context: ActorRefFactory): ActorMaterializer
127
}
128
```
129
130
[Materialization and Execution](./materialization.md)
131
132
### Junction Operations
133
134
Stream junction operators for merging, broadcasting, zipping, and partitioning multiple streams.
135
136
```scala { .api }
137
class Merge[T](inputPorts: Int, eagerComplete: Boolean)
138
class Broadcast[T](outputPorts: Int, eagerCancel: Boolean)
139
class Zip[A, B] extends ZipWith2[A, B, (A, B)]
140
class Partition[T](outputPorts: Int, partitioner: T => Int)
141
```
142
143
[Junction Operations](./junction-operations.md)
144
145
### I/O Integration
146
147
File I/O, TCP networking, and integration with Java streams and other I/O systems.
148
149
```scala { .api }
150
object FileIO {
151
def fromPath(path: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
152
def toPath(path: Path): Sink[ByteString, Future[IOResult]]
153
}
154
155
object Tcp {
156
def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]
157
}
158
```
159
160
[I/O Integration](./io-integration.md)
161
162
### Error Handling and Supervision
163
164
Comprehensive error handling with supervision strategies, recovery operations, and stream resilience patterns.
165
166
```scala { .api }
167
object Supervision {
168
sealed trait Directive
169
case object Stop extends Directive
170
case object Resume extends Directive
171
case object Restart extends Directive
172
173
type Decider = Function[Throwable, Directive]
174
}
175
```
176
177
[Error Handling and Supervision](./error-handling.md)
178
179
### Control Flow and Lifecycle
180
181
Stream lifecycle management with KillSwitch, StreamRefs for distribution, and queue integration.
182
183
```scala { .api }
184
trait KillSwitch {
185
def shutdown(): Unit
186
def abort(ex: Throwable): Unit
187
}
188
189
object KillSwitches {
190
def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch]
191
def shared(name: String): SharedKillSwitch
192
}
193
```
194
195
[Control Flow and Lifecycle](./control-flow.md)
196
197
## Types
198
199
```scala { .api }
200
// Fundamental types
201
type NotUsed = akka.NotUsed.type
202
type Done = akka.Done.type
203
204
// Shape hierarchy
205
abstract class Shape {
206
def inlets: immutable.Seq[Inlet[_]]
207
def outlets: immutable.Seq[Outlet[_]]
208
}
209
210
case class SourceShape[+T](out: Outlet[T]) extends Shape
211
case class FlowShape[-I, +O](in: Inlet[I], out: Outlet[O]) extends Shape
212
case class SinkShape[-T](in: Inlet[T]) extends Shape
213
214
// Ports
215
final class Inlet[T](s: String)
216
final class Outlet[T](s: String)
217
218
// Materialization
219
trait Graph[+S <: Shape, +M] {
220
def shape: S
221
def withAttributes(attr: Attributes): Graph[S, M]
222
}
223
224
// Results
225
case class IOResult(count: Long, status: Try[Done]) {
226
def wasSuccessful: Boolean
227
}
228
229
// Queue integration
230
sealed abstract class QueueOfferResult
231
object QueueOfferResult {
232
case object Enqueued extends QueueOfferResult
233
case object Dropped extends QueueOfferResult
234
case class Failure(cause: Throwable) extends QueueOfferResult
235
case object QueueClosed extends QueueOfferResult
236
}
237
238
// Strategies
239
sealed abstract class OverflowStrategy
240
object OverflowStrategy {
241
def dropHead: OverflowStrategy
242
def dropTail: OverflowStrategy
243
def backpressure: OverflowStrategy
244
def fail: OverflowStrategy
245
}
246
247
// Attributes
248
final case class Attributes(attributeList: List[Attributes.Attribute]) {
249
def and(other: Attributes): Attributes
250
def get[T <: Attributes.Attribute: ClassTag]: Option[T]
251
}
252
```