or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-com-typesafe-akka--akka-stream_2-12

An implementation of Reactive Streams and a DSL for stream processing built on top of Akka actors

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-stream_2.12@2.8.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-stream_2-12@2.8.0

0

# Akka Stream

1

2

Akka Stream is a powerful reactive streaming library built on top of the Akka actor framework that implements the Reactive Streams specification. It provides a high-level DSL for building resilient, distributed, and concurrent stream processing applications with strong back-pressure support and composable stream processing operators.

3

4

## Package Information

5

6

- **Package Name**: com.typesafe.akka:akka-stream_2.12

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Installation**:

10

- sbt: `libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.8.8"`

11

- Maven: `<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-stream_2.12</artifactId><version>2.8.8</version></dependency>`

12

13

## Core Imports

14

15

### Scala DSL

16

17

```scala

18

import akka.stream.scaladsl.{Source, Flow, Sink, RunnableGraph}

19

import akka.stream.Materializer

20

import akka.NotUsed

21

```

22

23

### Java DSL

24

25

```java

26

import akka.stream.javadsl.Source;

27

import akka.stream.javadsl.Flow;

28

import akka.stream.javadsl.Sink;

29

import akka.stream.Materializer;

30

```

31

32

## Basic Usage

33

34

```scala

35

import akka.actor.ActorSystem

36

import akka.stream.scaladsl.{Source, Sink}

37

import akka.stream.Materializer

38

39

implicit val system: ActorSystem = ActorSystem("stream-system")

40

implicit val materializer: Materializer = Materializer(system)

41

42

// Create a source of numbers 1 to 10

43

val source = Source(1 to 10)

44

45

// Transform and process the stream

46

val result = source

47

.map(_ * 2)

48

.filter(_ > 10)

49

.runWith(Sink.seq)

50

51

// result is a Future[Seq[Int]] containing [12, 14, 16, 18, 20]

52

```

53

54

## Architecture

55

56

Akka Stream is built around several key architectural concepts:

57

58

- **Reactive Streams**: Full compliance with the Reactive Streams specification for asynchronous stream processing with back-pressure

59

- **Graph DSL**: High-level declarative API for describing stream processing topologies as graphs with sources, flows, and sinks

60

- **Materialization**: Two-phase execution where stream blueprints are first defined, then materialized into running streams

61

- **Back-pressure**: Automatic flow control preventing overwhelming of downstream components

62

- **Actor Integration**: Built on Akka actors for distribution, fault tolerance, and scalability

63

- **Type Safety**: Strong typing throughout the stream processing pipeline with Scala's type system

64

65

## Capabilities

66

67

### Core Stream Types

68

69

The fundamental building blocks for creating and composing stream processing pipelines. These types define the structure and flow of data through reactive streams.

70

71

```scala { .api }

72

// Source: Stream with one output, no inputs

73

trait Source[+Out, +Mat] extends Graph[SourceShape[Out], Mat]

74

75

// Flow: Stream processing step with one input and one output

76

trait Flow[-In, +Out, +Mat] extends Graph[FlowShape[In, Out], Mat]

77

78

// Sink: Stream endpoint that consumes elements

79

trait Sink[-In, +Mat] extends Graph[SinkShape[In], Mat]

80

81

// RunnableGraph: Complete stream ready for execution

82

trait RunnableGraph[+Mat] extends Graph[ClosedShape, Mat]

83

```

84

85

[Core Stream Types](./core-stream-types.md)

86

87

### Stream Sources

88

89

Factory methods and utilities for creating stream sources from various data sources including collections, futures, actors, and external systems.

90

91

```scala { .api }

92

object Source {

93

def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]

94

def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed]

95

def future[T](futureElement: Future[T]): Source[T, NotUsed]

96

def single[T](element: T): Source[T, NotUsed]

97

def empty[T]: Source[T, NotUsed]

98

def repeat[T](element: T): Source[T, NotUsed]

99

}

100

```

101

102

[Stream Sources](./stream-sources.md)

103

104

### Stream Transformations

105

106

Core transformation operators for manipulating, filtering, grouping, and routing stream elements with strong type safety and back-pressure support.

107

108

```scala { .api }

109

// Basic transformations

110

def map[T2](f: Out => T2): Source[T2, Mat]

111

def filter(p: Out => Boolean): Source[Out, Mat]

112

def collect[T2](pf: PartialFunction[Out, T2]): Source[T2, Mat]

113

114

// Async transformations

115

def mapAsync[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]

116

def mapAsyncUnordered[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]

117

118

// Grouping and batching

119

def grouped(n: Int): Source[immutable.Seq[Out], Mat]

120

def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Source[Out, Mat]#Repr, RunnableGraph[Mat]]

121

```

122

123

[Stream Transformations](./stream-transformations.md)

124

125

### Stream Combining

126

127

Operations for merging, zipping, concatenating, and broadcasting streams to create complex data flow topologies.

128

129

```scala { .api }

130

// Combining sources

131

def merge[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]

132

def concat[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]

133

def zip[U](other: Graph[SourceShape[U], _]): Source[(Out, U), Mat]

134

135

// Broadcasting and balancing

136

def broadcast(outputCount: Int): Graph[UniformFanOutShape[Out, Out], NotUsed]

137

def balance[T](outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed]

138

```

139

140

[Stream Combining](./stream-combining.md)

141

142

### Stream Sinks

143

144

Endpoints for consuming stream elements including collection sinks, side-effect sinks, and integration with external systems.

145

146

```scala { .api }

147

object Sink {

148

def seq[T]: Sink[T, Future[immutable.Seq[T]]]

149

def head[T]: Sink[T, Future[T]]

150

def foreach[T](f: T => Unit): Sink[T, Future[Done]]

151

def fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]]

152

def ignore: Sink[Any, Future[Done]]

153

}

154

```

155

156

[Stream Sinks](./stream-sinks.md)

157

158

### Materialization and Execution

159

160

System for converting stream blueprints into running streams, managing resources, and controlling materialized values.

161

162

```scala { .api }

163

trait Materializer {

164

def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat

165

def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable

166

}

167

168

// Materialization control

169

def run(): Mat

170

def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2]): Mat2

171

def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]

172

```

173

174

[Materialization and Execution](./materialization.md)

175

176

### Error Handling and Supervision

177

178

Strategies for handling failures, implementing supervision, and recovering from errors in stream processing pipelines.

179

180

```scala { .api }

181

object Supervision {

182

type Decider = Throwable => Directive

183

184

sealed abstract class Directive

185

case object Resume extends Directive

186

case object Restart extends Directive

187

case object Stop extends Directive

188

}

189

190

// Error handling operators

191

def recover[U >: Out](pf: PartialFunction[Throwable, U]): Source[U, Mat]

192

def recoverWithRetries[U >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[U], _]]): Source[U, Mat]

193

```

194

195

[Error Handling](./error-handling.md)

196

197

### Stream Control and Lifecycle

198

199

Mechanisms for controlling stream lifecycle, implementing backpressure, rate limiting, and external stream termination.

200

201

```scala { .api }

202

// Flow control

203

def buffer(size: Int, overflowStrategy: OverflowStrategy): Source[Out, Mat]

204

def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Source[Out, Mat]

205

206

// Kill switches for external termination

207

trait KillSwitch {

208

def shutdown(): Unit

209

def abort(ex: Throwable): Unit

210

}

211

```

212

213

[Stream Control](./stream-control.md)

214

215

### Custom Stages

216

217

API for creating custom stream processing operators using GraphStage for advanced use cases requiring fine-grained control over stream behavior.

218

219

```scala { .api }

220

abstract class GraphStage[S <: Shape] extends Graph[S, NotUsed] {

221

def createLogic(inheritedAttributes: Attributes): GraphStageLogic

222

}

223

224

abstract class GraphStageLogic(val shape: Shape) {

225

def setHandler(in: Inlet[_], handler: InHandler): Unit

226

def setHandler(out: Outlet[_], handler: OutHandler): Unit

227

def push[T](out: Outlet[T], elem: T): Unit

228

def pull[T](in: Inlet[T]): Unit

229

}

230

```

231

232

[Custom Stages](./custom-stages.md)

233

234

### Integration

235

236

Integration with file systems, TCP/TLS networking, actors, and external reactive streams publishers/subscribers.

237

238

```scala { .api }

239

// File I/O

240

object FileIO {

241

def fromPath(f: Path): Source[ByteString, Future[IOResult]]

242

def toPath(f: Path): Sink[ByteString, Future[IOResult]]

243

}

244

245

// TCP networking

246

object Tcp {

247

def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]

248

def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]]

249

}

250

```

251

252

[Integration](./integration.md)

253

254

## Types

255

256

### Core Types

257

258

```scala { .api }

259

// Essential type for operations without materialized value

260

type NotUsed = akka.NotUsed

261

262

// Completion marker

263

sealed abstract class Done

264

case object Done extends Done

265

266

// Stream shapes

267

trait SourceShape[+T] extends Shape

268

trait FlowShape[-I, +O] extends Shape

269

trait SinkShape[-T] extends Shape

270

trait ClosedShape extends Shape

271

272

// Overflow strategies for buffering

273

sealed abstract class OverflowStrategy

274

object OverflowStrategy {

275

case object DropHead extends OverflowStrategy

276

case object DropTail extends OverflowStrategy

277

case object DropBuffer extends OverflowStrategy

278

case object DropNew extends OverflowStrategy

279

case object Backpressure extends OverflowStrategy

280

case object Fail extends OverflowStrategy

281

}

282

```