Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.
—
Stream lifecycle management with KillSwitch, StreamRefs for distribution, and queue integration.
trait KillSwitch {
def shutdown(): Unit // Graceful shutdown
def abort(ex: Throwable): Unit // Abort with error
}final class UniqueKillSwitch private[stream] extends KillSwitchfinal class SharedKillSwitch private[stream] extends KillSwitch {
val name: String
def flow[T]: Graph[FlowShape[T, T], SharedKillSwitch]
}object KillSwitches {
def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch]
def singleBidi[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]
def shared(name: String): SharedKillSwitch
}Unique KillSwitch:
import akka.stream.scaladsl.{Source, Sink, Keep, KillSwitches}
import scala.concurrent.duration._
// Single stream with kill switch
val (killSwitch, done) = Source.tick(1.second, 1.second, "ping")
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(println))(Keep.both)
.run()
// Later, terminate the stream gracefully
scala.concurrent.Future {
Thread.sleep(5000)
killSwitch.shutdown()
}
// Or abort with error
killSwitch.abort(new RuntimeException("User requested abort"))Shared KillSwitch:
// Create shared kill switch for multiple streams
val sharedKillSwitch = KillSwitches.shared("my-streams")
// Use in multiple streams
val stream1 = Source.tick(1.second, 1.second, "stream1")
.via(sharedKillSwitch.flow)
.runWith(Sink.foreach(println))
val stream2 = Source.tick(2.second, 2.second, "stream2")
.via(sharedKillSwitch.flow)
.runWith(Sink.foreach(println))
// Shutdown all streams using the shared kill switch
sharedKillSwitch.shutdown()trait SourceRef[T] {
def source: Source[T, NotUsed]
def getSource: javadsl.Source[T, NotUsed] // Java API
}trait SinkRef[In] {
def sink(): Sink[In, NotUsed]
def getSink(): javadsl.Sink[In, NotUsed] // Java API
}object StreamRefs {
def sourceRef[T](): Sink[T, Future[SourceRef[T]]]
def sinkRef[T](): Source[SinkRef[T], NotUsed]
}Creating and Using SourceRef:
import akka.stream.scaladsl.StreamRefs
// Create a SourceRef for remote consumption
val sourceRefSink: Sink[String, Future[SourceRef[String]]] = StreamRefs.sourceRef()
val (sourceRef: Future[SourceRef[String]]) = Source(List("hello", "world"))
.runWith(sourceRefSink)
// Use the SourceRef elsewhere (possibly remote)
sourceRef.foreach { ref =>
ref.source
.runWith(Sink.foreach(println))
}Creating and Using SinkRef:
// Create a SinkRef for remote production
val sinkRefSource: Source[SinkRef[String], NotUsed] = StreamRefs.sinkRef()
sinkRefSource.runWith(Sink.head).foreach { sinkRef =>
// Use the SinkRef to send data (possibly from remote)
Source(List("data1", "data2", "data3"))
.runWith(sinkRef.sink())
}trait SourceQueueWithComplete[T] {
def offer(elem: T): Future[QueueOfferResult]
def complete(): Unit
def fail(ex: Throwable): Unit
def watchCompletion(): Future[Done]
}trait SinkQueueWithCancel[T] {
def pull(): Future[Option[T]]
def cancel(): Unit
}sealed abstract class QueueOfferResult
object QueueOfferResult {
case object Enqueued extends QueueOfferResult
case object Dropped extends QueueOfferResult
case class Failure(cause: Throwable) extends QueueOfferResult
case object QueueClosed extends QueueOfferResult
}SourceQueue Usage:
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Source, Sink}
// Create source with queue
val (queue, done) = Source.queue[String](10, OverflowStrategy.backpressure)
.toMat(Sink.foreach(println))(Keep.both)
.run()
// Offer elements dynamically
queue.offer("hello").foreach { result =>
result match {
case QueueOfferResult.Enqueued => println("Enqueued successfully")
case QueueOfferResult.Dropped => println("Element was dropped")
case QueueOfferResult.QueueClosed => println("Queue is closed")
case QueueOfferResult.Failure(ex) => println(s"Failed: $ex")
}
}
queue.offer("world")
queue.complete() // Signal completionSinkQueue Usage:
// Create sink with queue
val queue = Source(1 to 100)
.runWith(Sink.queue())
// Pull elements dynamically
def pullNext(): Unit = {
queue.pull().foreach {
case Some(element) =>
println(s"Pulled: $element")
pullNext() // Continue pulling
case None =>
println("Stream completed")
}
}
pullNext()sealed trait CompletionStrategy
object CompletionStrategy {
case object Immediately extends CompletionStrategy
case object Draining extends CompletionStrategy
}Usage Example:
val (actorRef, done) = Source.actorRef[String](
completionMatcher = {
case "complete" => CompletionStrategy.Immediately
},
failureMatcher = {
case "fail" => new RuntimeException("Actor requested failure")
},
bufferSize = 10,
overflowStrategy = OverflowStrategy.dropHead
).toMat(Sink.foreach(println))(Keep.both).run()
// Control completion via actor messages
actorRef ! "hello"
actorRef ! "world"
actorRef ! "complete" // Triggers completiontrait FlowMonitor[T] {
def state: Future[StreamState]
}
sealed trait StreamState
case object Initializing extends StreamState
case object Running extends StreamState
case object Completed extends StreamState
case class Failed(cause: Throwable) extends StreamStateval monitoredStream = Source(1 to 10)
.monitor() { (_, monitor) =>
monitor.state.foreach { state =>
println(s"Stream state changed to: $state")
}
}
.watchTermination() { (_, done) =>
done.onComplete {
case Success(_) => println("Stream completed successfully")
case Failure(ex) => println(s"Stream failed: $ex")
}
}
.runWith(Sink.ignore)trait FlowOps[+Out, +Mat] {
def idleTimeout(timeout: FiniteDuration): Repr[Out]
def completionTimeout(timeout: FiniteDuration): Repr[Out]
def backpressureTimeout(timeout: FiniteDuration): Repr[Out]
def keepAlive(maxIdle: FiniteDuration, injectedElem: () => Out): Repr[Out]
}Usage Examples:
import scala.concurrent.duration._
// Timeout if no elements received within 30 seconds
val withIdleTimeout = Source.tick(10.seconds, 10.seconds, "ping")
.idleTimeout(30.seconds)
.runWith(Sink.foreach(println))
// Keep alive by injecting heartbeat elements
val keepAliveStream = Source.tick(5.seconds, 5.seconds, "data")
.keepAlive(2.seconds, () => "heartbeat")
.runWith(Sink.foreach(println))
// Timeout on overall completion
val completionTimeoutStream = Source(1 to 1000)
.throttle(1, 1.second)
.completionTimeout(10.seconds) // Fail if not completed in 10 seconds
.runWith(Sink.seq)import scala.util.{Success, Failure}
// Proper resource cleanup with monitoring
def createManagedStream[T](resource: => AutoCloseable)(
streamFactory: AutoCloseable => Source[T, NotUsed]
): Source[T, NotUsed] = {
Source.fromGraph(GraphDSL.create() { implicit builder =>
val src = Source.lazySource { () =>
val res = resource
streamFactory(res)
.watchTermination() { (_, done) =>
done.onComplete {
case Success(_) => res.close()
case Failure(_) => res.close()
}
NotUsed
}
}
val shape = builder.add(src)
SourceShape(shape.out)
})
}
// Usage
val managedFileStream = createManagedStream {
new FileInputStream("data.txt")
} { inputStream =>
StreamConverters.fromInputStream(() => inputStream)
.via(Framing.delimiter(ByteString("\n"), 1024))
.map(_.utf8String)
}This control flow system provides comprehensive lifecycle management while maintaining the reactive streams semantics and backpressure throughout the stream processing pipeline.
Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5