CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.

Pending
Overview
Eval results
Files

control-flow.mddocs/

Control Flow and Lifecycle

Stream lifecycle management with KillSwitch, StreamRefs for distribution, and queue integration.

KillSwitch - Stream Termination Control

Base KillSwitch Interface

trait KillSwitch {
  def shutdown(): Unit           // Graceful shutdown
  def abort(ex: Throwable): Unit // Abort with error
}

UniqueKillSwitch - Single Stream Control

final class UniqueKillSwitch private[stream] extends KillSwitch

SharedKillSwitch - Multi-Stream Control

final class SharedKillSwitch private[stream] extends KillSwitch {
  val name: String
  def flow[T]: Graph[FlowShape[T, T], SharedKillSwitch]
}

KillSwitches Factory

object KillSwitches {
  def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch]
  def singleBidi[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]
  def shared(name: String): SharedKillSwitch
}

Usage Examples

Unique KillSwitch:

import akka.stream.scaladsl.{Source, Sink, Keep, KillSwitches}
import scala.concurrent.duration._

// Single stream with kill switch
val (killSwitch, done) = Source.tick(1.second, 1.second, "ping")
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(Sink.foreach(println))(Keep.both)
  .run()

// Later, terminate the stream gracefully
scala.concurrent.Future {
  Thread.sleep(5000)
  killSwitch.shutdown()
}

// Or abort with error
killSwitch.abort(new RuntimeException("User requested abort"))

Shared KillSwitch:

// Create shared kill switch for multiple streams
val sharedKillSwitch = KillSwitches.shared("my-streams")

// Use in multiple streams
val stream1 = Source.tick(1.second, 1.second, "stream1")
  .via(sharedKillSwitch.flow)
  .runWith(Sink.foreach(println))

val stream2 = Source.tick(2.second, 2.second, "stream2") 
  .via(sharedKillSwitch.flow)
  .runWith(Sink.foreach(println))

// Shutdown all streams using the shared kill switch
sharedKillSwitch.shutdown()

StreamRefs - Distributed Streaming

SourceRef - Remote Source Reference

trait SourceRef[T] {
  def source: Source[T, NotUsed]
  def getSource: javadsl.Source[T, NotUsed] // Java API
}

SinkRef - Remote Sink Reference

trait SinkRef[In] {
  def sink(): Sink[In, NotUsed]
  def getSink(): javadsl.Sink[In, NotUsed] // Java API
}

StreamRefs Factory

object StreamRefs {
  def sourceRef[T](): Sink[T, Future[SourceRef[T]]]
  def sinkRef[T](): Source[SinkRef[T], NotUsed]
}

Usage Examples

Creating and Using SourceRef:

import akka.stream.scaladsl.StreamRefs

// Create a SourceRef for remote consumption
val sourceRefSink: Sink[String, Future[SourceRef[String]]] = StreamRefs.sourceRef()

val (sourceRef: Future[SourceRef[String]]) = Source(List("hello", "world"))
  .runWith(sourceRefSink)

// Use the SourceRef elsewhere (possibly remote)
sourceRef.foreach { ref =>
  ref.source
    .runWith(Sink.foreach(println))
}

Creating and Using SinkRef:

// Create a SinkRef for remote production
val sinkRefSource: Source[SinkRef[String], NotUsed] = StreamRefs.sinkRef()

sinkRefSource.runWith(Sink.head).foreach { sinkRef =>
  // Use the SinkRef to send data (possibly from remote)
  Source(List("data1", "data2", "data3"))
    .runWith(sinkRef.sink())
}

Queue Integration

SourceQueue - Dynamic Element Injection

trait SourceQueueWithComplete[T] {
  def offer(elem: T): Future[QueueOfferResult]
  def complete(): Unit
  def fail(ex: Throwable): Unit  
  def watchCompletion(): Future[Done]
}

SinkQueue - Dynamic Element Extraction

trait SinkQueueWithCancel[T] {
  def pull(): Future[Option[T]]
  def cancel(): Unit
}

QueueOfferResult Types

sealed abstract class QueueOfferResult

object QueueOfferResult {
  case object Enqueued extends QueueOfferResult
  case object Dropped extends QueueOfferResult
  case class Failure(cause: Throwable) extends QueueOfferResult  
  case object QueueClosed extends QueueOfferResult
}

Usage Examples

SourceQueue Usage:

import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Source, Sink}

// Create source with queue
val (queue, done) = Source.queue[String](10, OverflowStrategy.backpressure)
  .toMat(Sink.foreach(println))(Keep.both)
  .run()

// Offer elements dynamically
queue.offer("hello").foreach { result =>
  result match {
    case QueueOfferResult.Enqueued => println("Enqueued successfully")
    case QueueOfferResult.Dropped => println("Element was dropped")
    case QueueOfferResult.QueueClosed => println("Queue is closed")
    case QueueOfferResult.Failure(ex) => println(s"Failed: $ex")
  }
}

queue.offer("world")
queue.complete() // Signal completion

SinkQueue Usage:

// Create sink with queue
val queue = Source(1 to 100)
  .runWith(Sink.queue())

// Pull elements dynamically
def pullNext(): Unit = {
  queue.pull().foreach {
    case Some(element) => 
      println(s"Pulled: $element")
      pullNext() // Continue pulling
    case None => 
      println("Stream completed")
  }
}

pullNext()

Completion Strategies

CompletionStrategy Types

sealed trait CompletionStrategy

object CompletionStrategy {
  case object Immediately extends CompletionStrategy
  case object Draining extends CompletionStrategy
}

Usage Example:

val (actorRef, done) = Source.actorRef[String](
  completionMatcher = {
    case "complete" => CompletionStrategy.Immediately
  },
  failureMatcher = {
    case "fail" => new RuntimeException("Actor requested failure")
  },
  bufferSize = 10,
  overflowStrategy = OverflowStrategy.dropHead
).toMat(Sink.foreach(println))(Keep.both).run()

// Control completion via actor messages
actorRef ! "hello"
actorRef ! "world"  
actorRef ! "complete" // Triggers completion

Stream Monitoring and Lifecycle

Flow Monitoring

trait FlowMonitor[T] {
  def state: Future[StreamState]
}

sealed trait StreamState
case object Initializing extends StreamState
case object Running extends StreamState
case object Completed extends StreamState
case class Failed(cause: Throwable) extends StreamState

Lifecycle Hooks

val monitoredStream = Source(1 to 10)
  .monitor() { (_, monitor) =>
    monitor.state.foreach { state =>
      println(s"Stream state changed to: $state")
    }
  }
  .watchTermination() { (_, done) =>
    done.onComplete {
      case Success(_) => println("Stream completed successfully")  
      case Failure(ex) => println(s"Stream failed: $ex")
    }
  }
  .runWith(Sink.ignore)

Timeouts and Keep-Alive

Timeout Operations

trait FlowOps[+Out, +Mat] {
  def idleTimeout(timeout: FiniteDuration): Repr[Out]
  def completionTimeout(timeout: FiniteDuration): Repr[Out]
  def backpressureTimeout(timeout: FiniteDuration): Repr[Out]
  def keepAlive(maxIdle: FiniteDuration, injectedElem: () => Out): Repr[Out]
}

Usage Examples:

import scala.concurrent.duration._

// Timeout if no elements received within 30 seconds
val withIdleTimeout = Source.tick(10.seconds, 10.seconds, "ping")
  .idleTimeout(30.seconds)
  .runWith(Sink.foreach(println))

// Keep alive by injecting heartbeat elements
val keepAliveStream = Source.tick(5.seconds, 5.seconds, "data")
  .keepAlive(2.seconds, () => "heartbeat")
  .runWith(Sink.foreach(println))

// Timeout on overall completion
val completionTimeoutStream = Source(1 to 1000)
  .throttle(1, 1.second)
  .completionTimeout(10.seconds) // Fail if not completed in 10 seconds
  .runWith(Sink.seq)

Resource Management

Resource Cleanup Patterns

import scala.util.{Success, Failure}

// Proper resource cleanup with monitoring
def createManagedStream[T](resource: => AutoCloseable)(
  streamFactory: AutoCloseable => Source[T, NotUsed]
): Source[T, NotUsed] = {
  
  Source.fromGraph(GraphDSL.create() { implicit builder =>
    val src = Source.lazySource { () =>
      val res = resource
      streamFactory(res)
        .watchTermination() { (_, done) =>
          done.onComplete {
            case Success(_) => res.close()
            case Failure(_) => res.close()
          }
          NotUsed
        }
    }
    
    val shape = builder.add(src)
    SourceShape(shape.out)
  })
}

// Usage
val managedFileStream = createManagedStream {
  new FileInputStream("data.txt")
} { inputStream =>
  StreamConverters.fromInputStream(() => inputStream)
    .via(Framing.delimiter(ByteString("\n"), 1024))
    .map(_.utf8String)
}

This control flow system provides comprehensive lifecycle management while maintaining the reactive streams semantics and backpressure throughout the stream processing pipeline.

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

docs

control-flow.md

core-components.md

error-handling.md

graph-building.md

index.md

io-integration.md

junction-operations.md

materialization.md

stream-operations.md

tile.json