or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

actor-system.mdactors-props.mdfsm.mdindex.mdmessaging-references.mdpatterns-utilities.mdscheduling-timers.mdsupervision.md
tile.json

patterns-utilities.mddocs/

Patterns and Utilities

Essential patterns and utilities for Akka Actor including ask pattern, circuit breaker, graceful shutdown, and timeout handling.

Capabilities

Ask Pattern

Request-response pattern for obtaining Future-based replies from actors.

/**
 * Ask pattern support trait
 */
trait AskSupport {
  /**
   * Send message and return Future of response
   * @param actorRef - Target actor
   * @param message - Message to send
   * @param timeout - Request timeout
   * @return Future[Any] containing response
   */
  def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any]
}

/**
 * Implicit class adding ask capability to ActorRef
 */  
final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
  /**
   * Ask operator - send message and return Future
   * @param message - Message to send
   * @param timeout - Implicit timeout
   * @param sender - Optional sender (defaults to Actor.noSender)
   * @return Future[Any] containing response
   */
  def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any]
  
  /**
   * Ask method - send message and return Future
   * @param message - Message to send  
   * @param timeout - Implicit timeout
   * @param sender - Optional sender (defaults to Actor.noSender)
   * @return Future[Any] containing response
   */
  def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any]
}

/**
 * Java API for ask pattern
 */
object Patterns {
  /**
   * Java ask method returning CompletionStage
   * @param actor - Target actor
   * @param message - Message to send
   * @param timeout - Request timeout
   * @return CompletionStage[AnyRef] containing response
   */
  def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef]
  
  /**
   * Java ask with duration timeout
   * @param actor - Target actor
   * @param message - Message to send
   * @param timeout - Request timeout duration (java.time.Duration)
   * @return CompletionStage[AnyRef] containing response
   */
  def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef]
}

Usage Examples:

import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.Future

// Scala ask pattern
implicit val timeout = Timeout(5.seconds)
val system = ActorSystem("AskExample")

class ResponseActor extends Actor {
  def receive = {
    case "ping" => sender() ! "pong"
    case x: Int => sender() ! (x * 2)
    case "fail" => throw new RuntimeException("Ask failure")
  }
}

val actor = system.actorOf(Props[ResponseActor])

// Using ? operator
val future1: Future[Any] = actor ? "ping"
future1.foreach(println) // Prints: pong

// Using ask method
val future2: Future[Any] = ask(actor, 42)
future2.foreach(println) // Prints: 84

// Type casting response
val future3: Future[String] = (actor ? "ping").mapTo[String]

// Error handling
val future4: Future[Any] = actor ? "fail"
future4.recover {
  case _: AskTimeoutException => "Request timed out"
  case ex => s"Request failed: ${ex.getMessage}"
}

// Java API
import akka.pattern.Patterns
val javaFuture = Patterns.ask(actor, "ping", timeout)

Pipe Pattern

Pipeline Future results to actors for asynchronous response handling.

/**
 * Pipe pattern support trait
 */
trait PipeToSupport {
  /**
   * Implicit class adding pipe capability to Future
   */
  implicit class PipeableFuture[T](val future: Future[T]) extends AnyVal {
    /**
     * Send Future result to actor when complete
     * @param recipient - Target actor
     * @param sender - Implicit sender reference
     * @return Original Future
     */
    def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[T]
    
    /**
     * Send Future result to actor selection when complete
     * @param recipient - Target actor selection
     * @param sender - Implicit sender reference  
     * @return Original Future
     */
    def pipeTo(recipient: ActorSelection)(implicit sender: ActorRef = Actor.noSender): Future[T]
  }
}

/**
 * Java API for pipe pattern
 */
object Patterns {
  /**
   * Pipe CompletionStage result to actor
   * @param future - CompletionStage to pipe
   * @param recipient - Target actor
   * @param context - Execution context
   * @return Original CompletionStage
   */
  def pipe[T](
    future: CompletionStage[T], 
    recipient: ActorRef
  )(implicit context: ExecutionContext): CompletionStage[T]
}

Usage Examples:

import akka.pattern.pipe
import scala.concurrent.Future

class PipeActor extends Actor {
  import context.dispatcher
  
  def receive = {
    case "start" =>
      // Start async operation and pipe result back to self
      val futureResult: Future[String] = asyncOperation()
      futureResult.pipeTo(self)
      
    case result: String =>
      println(s"Received async result: $result")
      
    case Status.Failure(ex) =>
      println(s"Async operation failed: $ex")
  }
  
  private def asyncOperation(): Future[String] = {
    Future {
      Thread.sleep(1000)
      "Async operation completed"
    }
  }
}

// Pipe to different actor
class RequestActor extends Actor {
  import context.dispatcher
  
  def receive = {
    case "process" =>
      val originalSender = sender()
      val futureResult = expensiveComputation()
      futureResult.pipeTo(originalSender)
  }
  
  private def expensiveComputation(): Future[Int] = {
    Future(42)
  }
}

Circuit Breaker

Fault tolerance pattern preventing cascading failures in distributed systems.

/**
 * Circuit breaker implementation for fault tolerance
 */
class CircuitBreaker(
  scheduler: Scheduler,
  maxFailures: Int,
  callTimeout: Duration,
  resetTimeout: Duration
) {
  /**
   * Execute operation with circuit breaker protection
   * @param body - Async operation to protect
   * @return Future[T] with circuit breaker semantics
   */
  def withCircuitBreaker[T](body: => Future[T]): Future[T]
  
  /**
   * Execute synchronous operation with circuit breaker
   * @param body - Sync operation to protect
   * @return T with circuit breaker semantics
   */
  def withSyncCircuitBreaker[T](body: => T): T
  
  /**
   * Java API - execute with circuit breaker
   * @param body - Callable returning CompletionStage
   * @return CompletionStage[T] with protection
   */
  def callWithCircuitBreakerCS[T](body: Callable[CompletionStage[T]]): CompletionStage[T]
  
  /**
   * Register callback for circuit open state
   * @param callback - Code to execute when circuit opens
   * @return This CircuitBreaker for chaining
   */
  def onOpen(callback: => Unit): CircuitBreaker
  
  /**
   * Register callback for circuit closed state
   * @param callback - Code to execute when circuit closes
   * @return This CircuitBreaker for chaining
   */
  def onClose(callback: => Unit): CircuitBreaker
  
  /**
   * Register callback for circuit half-open state
   * @param callback - Code to execute when circuit half-opens
   * @return This CircuitBreaker for chaining
   */
  def onHalfOpen(callback: => Unit): CircuitBreaker
  
  /**
   * Current failure count
   */
  def currentFailureCount: Int
  
  /**
   * Whether circuit is currently open
   */
  def isOpen: Boolean
  
  /**
   * Whether circuit is currently half-open
   */
  def isHalfOpen: Boolean
  
  /**
   * Whether circuit is currently closed
   */
  def isClosed: Boolean
}

Usage Examples:

import akka.pattern.CircuitBreaker
import scala.concurrent.duration._

class ServiceActor extends Actor {
  import context.dispatcher
  
  // Configure circuit breaker
  val breaker = new CircuitBreaker(
    scheduler = context.system.scheduler,
    maxFailures = 5,
    callTimeout = 10.seconds,  
    resetTimeout = 1.minute
  ).onOpen {
    println("Circuit breaker opened")
  }.onClose {
    println("Circuit breaker closed")  
  }.onHalfOpen {
    println("Circuit breaker half-open")
  }
  
  def receive = {
    case "call-service" =>
      val originalSender = sender()
      
      // Protect external service call
      val protectedCall = breaker.withCircuitBreaker {
        callExternalService()
      }
      
      protectedCall.pipeTo(originalSender)
      
    case "sync-call" =>
      try {
        val result = breaker.withSyncCircuitBreaker {
          expensiveBlockingOperation()
        }
        sender() ! result
      } catch {
        case _: CircuitBreakerOpenException =>
          sender() ! "Service unavailable - circuit open"
      }
  }
  
  private def callExternalService(): Future[String] = {
    // Simulate external service call that might fail
    Future {
      if (math.random() < 0.3) throw new RuntimeException("Service error")
      "Service response"
    }
  }
  
  private def expensiveBlockingOperation(): String = {
    Thread.sleep(1000)
    "Blocking result"
  }
}

Graceful Stop

Controlled actor shutdown with optional stop message and timeout.

/**
 * Graceful stop support
 */
trait GracefulStopSupport {
  /**
   * Stop actor gracefully with timeout
   * @param actor - Actor to stop
   * @param timeout - Maximum wait time
   * @param stopMessage - Optional custom stop message
   * @return Future[Boolean] - true if stopped within timeout
   */
  def gracefulStop(
    actor: ActorRef,
    timeout: Duration,
    stopMessage: Any = PoisonPill
  ): Future[Boolean]
}

/**
 * Java API for graceful stop
 */
object Patterns {
  /**
   * Java graceful stop method
   * @param actor - Actor to stop
   * @param duration - Timeout duration
   * @param stopMessage - Stop message to send
   * @return CompletionStage[Boolean] indicating success
   */
  def gracefulStop(
    actor: ActorRef,
    duration: Duration,
    stopMessage: AnyRef
  ): CompletionStage[java.lang.Boolean]
}

Usage Examples:

import akka.pattern.gracefulStop
import scala.concurrent.duration._

class GracefulActor extends Actor {
  def receive = {
    case "shutdown" =>
      // Custom shutdown logic
      println("Performing cleanup...")
      context.stop(self)
    case PoisonPill =>
      // Handle poison pill
      println("Graceful shutdown requested")
      context.stop(self)
    case msg =>
      println(s"Processing: $msg")
  }
}

// Using graceful stop
val actor = system.actorOf(Props[GracefulActor])

// Stop with default PoisonPill
val stopped1: Future[Boolean] = gracefulStop(actor, 5.seconds)
stopped1.foreach { success =>
  if (success) println("Actor stopped gracefully")
  else println("Actor stop timed out")
}

// Stop with custom message
val stopped2: Future[Boolean] = gracefulStop(actor, 3.seconds, "shutdown")

Timeout Utilities

Timeout value wrapper for ask pattern and other timed operations.

/**
 * Timeout value wrapper
 */
case class Timeout(duration: Duration) {
  /**
   * Create timeout from length and time unit
   * @param length - Timeout length
   * @param unit - Time unit
   * @return New Timeout
   */
  def apply(length: Long, unit: TimeUnit): Timeout = Timeout(Duration(length, unit))
  
  /**
   * Convert to Java Duration
   */
  def asJava: java.time.Duration = duration.asJava
}

object Timeout {
  /**
   * Create timeout from duration
   * @param duration - Timeout duration
   * @return Timeout instance
   */
  def apply(duration: Duration): Timeout = new Timeout(duration)
  
  /**
   * Create timeout from length and unit
   * @param length - Timeout length
   * @param unit - Time unit
   * @return Timeout instance
   */
  def apply(length: Long, unit: TimeUnit): Timeout = 
    new Timeout(Duration(length, unit))
  
  /**
   * Create timeout from Java duration
   * @param duration - Java Duration
   * @return Timeout instance
   */
  def create(duration: java.time.Duration): Timeout = 
    new Timeout(Duration.fromNanos(duration.toNanos))
  
  /**
   * Implicit conversion from Duration
   * @param duration - Duration to convert
   * @return Timeout instance
   */
  implicit def durationToTimeout(duration: Duration): Timeout = 
    new Timeout(duration)
}

Usage Examples:

import akka.util.Timeout
import scala.concurrent.duration._

// Creating timeouts
implicit val timeout1 = Timeout(5.seconds)
implicit val timeout2 = Timeout(3000, TimeUnit.MILLISECONDS)
implicit val timeout3 = Timeout.create(java.time.Duration.ofSeconds(10))

// Using with ask pattern
val future = actor ? "request" // Uses implicit timeout

// Explicit timeout
val explicitFuture = actor.ask("request")(Timeout(2.seconds))

// Java API
val javaTimeout = Timeout.create(java.time.Duration.ofMinutes(1))

Retry Support

Utilities for retrying failed operations with configurable strategies.

/**
 * Retry support utilities
 */
trait RetrySupport {
  /**
   * Retry async operation with backoff
   * @param operation - Operation to retry
   * @param maxRetries - Maximum retry attempts
   * @param initialDelay - Initial delay between retries
   * @param backoffFactor - Exponential backoff factor
   * @param maxDelay - Maximum delay between retries
   * @return Future[T] with retry logic
   */
  def retry[T](
    operation: () => Future[T],
    maxRetries: Int,
    initialDelay: Duration,
    backoffFactor: Double = 2.0,
    maxDelay: Duration = Duration.Inf
  )(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]
}

Types

/**
 * Exception thrown when ask times out
 */
class AskTimeoutException(message: String) extends TimeoutException(message) {
  def this(sender: ActorRef, recipient: ActorRef, timeout: Duration) = 
    this(s"Ask timed out on [$recipient] after [$timeout]. Message from [$sender]")
}

/**
 * Exception thrown when circuit breaker is open
 */
class CircuitBreakerOpenException(message: String) extends RuntimeException(message)

/**
 * Status messages for operation results
 */
object Status {
  /**
   * Success status wrapper
   */
  final case class Success(status: Any) extends Status
  
  /**
   * Failure status wrapper  
   */
  final case class Failure(cause: Throwable) extends Status
  
  sealed trait Status extends Serializable
}

/**
 * Future timeout support
 */
trait FutureTimeoutSupport {
  /**
   * Add timeout to Future
   * @param future - Future to timeout
   * @param timeout - Timeout duration
   * @return Future[T] that fails with TimeoutException if not completed in time
   */
  def after[T](future: Future[T], timeout: Duration)
              (implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]
}