or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

control-flow.mdcore-components.mderror-handling.mdgraph-building.mdindex.mdio-integration.mdjunction-operations.mdmaterialization.mdstream-operations.md

index.mddocs/

0

# Akka Streams

1

2

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space. This library provides a domain-specific language for expressing complex data transformation pipelines with automatic backpressure management, enabling developers to build robust, asynchronous stream processing applications.

3

4

## Package Information

5

6

- **Package Name**: com.typesafe.akka:akka-stream_2.13.0-M5

7

- **Package Type**: maven

8

- **Language**: Scala (with Java API)

9

- **Version**: 2.5.23

10

- **Installation**: Add to `build.sbt`: `libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.23"`

11

12

## Core Imports

13

14

```scala

15

import akka.stream.scaladsl.{Source, Flow, Sink}

16

import akka.stream.{ActorMaterializer, Materializer}

17

import akka.actor.ActorSystem

18

import akka.{Done, NotUsed}

19

```

20

21

Java API:

22

```java

23

import akka.stream.javadsl.*;

24

import akka.stream.ActorMaterializer;

25

import akka.stream.Materializer;

26

import akka.actor.ActorSystem;

27

```

28

29

## Basic Usage

30

31

```scala

32

import akka.actor.ActorSystem

33

import akka.stream.ActorMaterializer

34

import akka.stream.scaladsl.{Source, Sink}

35

36

implicit val system = ActorSystem("MySystem")

37

implicit val materializer: ActorMaterializer = ActorMaterializer()

38

39

// Create a simple stream: numbers 1 to 10, multiply by 2, print results

40

val source: Source[Int, NotUsed] = Source(1 to 10)

41

val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

42

43

val result: Future[Done] = source

44

.map(_ * 2)

45

.runWith(sink)

46

47

// Result will print: 2, 4, 6, 8, 10, 12, 14, 16, 18, 20

48

```

49

50

## Architecture

51

52

Akka Streams is built around several key abstractions:

53

54

- **Graph**: The blueprint of a stream processing topology

55

- **Shape**: Defines the inlets and outlets of a graph component

56

- **Materializer**: Responsible for turning graph blueprints into running streams

57

- **Source**: Stream component with one output (data producer)

58

- **Flow**: Stream component with one input and one output (data transformer)

59

- **Sink**: Stream component with one input (data consumer)

60

61

All stream processing is built using these composable, type-safe building blocks that automatically handle backpressure according to the Reactive Streams specification.

62

63

## Capabilities

64

65

### Core Stream Components

66

67

The fundamental building blocks for creating reactive streams with type-safe composition and automatic backpressure handling.

68

69

```scala { .api }

70

// Source - produces elements

71

final class Source[+Out, +Mat](traversalBuilder: LinearTraversalBuilder, shape: SourceShape[Out])

72

73

// Flow - transforms elements

74

final class Flow[-In, +Out, +Mat](traversalBuilder: LinearTraversalBuilder, shape: FlowShape[In, Out])

75

76

// Sink - consumes elements

77

final class Sink[-In, +Mat](traversalBuilder: LinearTraversalBuilder, shape: SinkShape[In])

78

```

79

80

[Core Stream Components](./core-components.md)

81

82

### Graph Building and Composition

83

84

Advanced graph construction using GraphDSL for complex stream topologies including fan-in, fan-out, and custom shapes.

85

86

```scala { .api }

87

object GraphDSL {

88

def create[S <: Shape, Mat](buildBlock: GraphDSL.Builder[Mat] => S): Graph[S, Mat]

89

90

class Builder[+M] {

91

def add[S <: Shape](graph: Graph[S, _]): S

92

// Connection operators: ~>, <~, via, to, from

93

}

94

}

95

```

96

97

[Graph Building and Composition](./graph-building.md)

98

99

### Stream Operations and Transformations

100

101

Comprehensive set of stream processing operations including mapping, filtering, grouping, timing, and error handling.

102

103

```scala { .api }

104

trait FlowOps[+Out, +Mat] {

105

def map[T](f: Out => T): Repr[T]

106

def filter(p: Out => Boolean): Repr[Out]

107

def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]

108

def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Repr, Closed]

109

def throttle(elements: Int, per: FiniteDuration): Repr[Out]

110

}

111

```

112

113

[Stream Operations and Transformations](./stream-operations.md)

114

115

### Materialization and Execution

116

117

Stream materialization with ActorMaterializer, lifecycle management, and execution control.

118

119

```scala { .api }

120

abstract class Materializer {

121

def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat

122

def withNamePrefix(name: String): Materializer

123

}

124

125

object ActorMaterializer {

126

def apply()(implicit context: ActorRefFactory): ActorMaterializer

127

}

128

```

129

130

[Materialization and Execution](./materialization.md)

131

132

### Junction Operations

133

134

Stream junction operators for merging, broadcasting, zipping, and partitioning multiple streams.

135

136

```scala { .api }

137

class Merge[T](inputPorts: Int, eagerComplete: Boolean)

138

class Broadcast[T](outputPorts: Int, eagerCancel: Boolean)

139

class Zip[A, B] extends ZipWith2[A, B, (A, B)]

140

class Partition[T](outputPorts: Int, partitioner: T => Int)

141

```

142

143

[Junction Operations](./junction-operations.md)

144

145

### I/O Integration

146

147

File I/O, TCP networking, and integration with Java streams and other I/O systems.

148

149

```scala { .api }

150

object FileIO {

151

def fromPath(path: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]

152

def toPath(path: Path): Sink[ByteString, Future[IOResult]]

153

}

154

155

object Tcp {

156

def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]

157

}

158

```

159

160

[I/O Integration](./io-integration.md)

161

162

### Error Handling and Supervision

163

164

Comprehensive error handling with supervision strategies, recovery operations, and stream resilience patterns.

165

166

```scala { .api }

167

object Supervision {

168

sealed trait Directive

169

case object Stop extends Directive

170

case object Resume extends Directive

171

case object Restart extends Directive

172

173

type Decider = Function[Throwable, Directive]

174

}

175

```

176

177

[Error Handling and Supervision](./error-handling.md)

178

179

### Control Flow and Lifecycle

180

181

Stream lifecycle management with KillSwitch, StreamRefs for distribution, and queue integration.

182

183

```scala { .api }

184

trait KillSwitch {

185

def shutdown(): Unit

186

def abort(ex: Throwable): Unit

187

}

188

189

object KillSwitches {

190

def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch]

191

def shared(name: String): SharedKillSwitch

192

}

193

```

194

195

[Control Flow and Lifecycle](./control-flow.md)

196

197

## Types

198

199

```scala { .api }

200

// Fundamental types

201

type NotUsed = akka.NotUsed.type

202

type Done = akka.Done.type

203

204

// Shape hierarchy

205

abstract class Shape {

206

def inlets: immutable.Seq[Inlet[_]]

207

def outlets: immutable.Seq[Outlet[_]]

208

}

209

210

case class SourceShape[+T](out: Outlet[T]) extends Shape

211

case class FlowShape[-I, +O](in: Inlet[I], out: Outlet[O]) extends Shape

212

case class SinkShape[-T](in: Inlet[T]) extends Shape

213

214

// Ports

215

final class Inlet[T](s: String)

216

final class Outlet[T](s: String)

217

218

// Materialization

219

trait Graph[+S <: Shape, +M] {

220

def shape: S

221

def withAttributes(attr: Attributes): Graph[S, M]

222

}

223

224

// Results

225

case class IOResult(count: Long, status: Try[Done]) {

226

def wasSuccessful: Boolean

227

}

228

229

// Queue integration

230

sealed abstract class QueueOfferResult

231

object QueueOfferResult {

232

case object Enqueued extends QueueOfferResult

233

case object Dropped extends QueueOfferResult

234

case class Failure(cause: Throwable) extends QueueOfferResult

235

case object QueueClosed extends QueueOfferResult

236

}

237

238

// Strategies

239

sealed abstract class OverflowStrategy

240

object OverflowStrategy {

241

def dropHead: OverflowStrategy

242

def dropTail: OverflowStrategy

243

def backpressure: OverflowStrategy

244

def fail: OverflowStrategy

245

}

246

247

// Attributes

248

final case class Attributes(attributeList: List[Attributes.Attribute]) {

249

def and(other: Attributes): Attributes

250

def get[T <: Attributes.Attribute: ClassTag]: Option[T]

251

}

252

```