Essential patterns and utilities for Akka Actor including ask pattern, circuit breaker, graceful shutdown, and timeout handling.
Request-response pattern for obtaining Future-based replies from actors.
/**
* Ask pattern support trait
*/
trait AskSupport {
/**
* Send message and return Future of response
* @param actorRef - Target actor
* @param message - Message to send
* @param timeout - Request timeout
* @return Future[Any] containing response
*/
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any]
}
/**
* Implicit class adding ask capability to ActorRef
*/
final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
/**
* Ask operator - send message and return Future
* @param message - Message to send
* @param timeout - Implicit timeout
* @param sender - Optional sender (defaults to Actor.noSender)
* @return Future[Any] containing response
*/
def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any]
/**
* Ask method - send message and return Future
* @param message - Message to send
* @param timeout - Implicit timeout
* @param sender - Optional sender (defaults to Actor.noSender)
* @return Future[Any] containing response
*/
def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any]
}
/**
* Java API for ask pattern
*/
object Patterns {
/**
* Java ask method returning CompletionStage
* @param actor - Target actor
* @param message - Message to send
* @param timeout - Request timeout
* @return CompletionStage[AnyRef] containing response
*/
def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef]
/**
* Java ask with duration timeout
* @param actor - Target actor
* @param message - Message to send
* @param timeout - Request timeout duration (java.time.Duration)
* @return CompletionStage[AnyRef] containing response
*/
def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef]
}Usage Examples:
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.Future
// Scala ask pattern
implicit val timeout = Timeout(5.seconds)
val system = ActorSystem("AskExample")
class ResponseActor extends Actor {
def receive = {
case "ping" => sender() ! "pong"
case x: Int => sender() ! (x * 2)
case "fail" => throw new RuntimeException("Ask failure")
}
}
val actor = system.actorOf(Props[ResponseActor])
// Using ? operator
val future1: Future[Any] = actor ? "ping"
future1.foreach(println) // Prints: pong
// Using ask method
val future2: Future[Any] = ask(actor, 42)
future2.foreach(println) // Prints: 84
// Type casting response
val future3: Future[String] = (actor ? "ping").mapTo[String]
// Error handling
val future4: Future[Any] = actor ? "fail"
future4.recover {
case _: AskTimeoutException => "Request timed out"
case ex => s"Request failed: ${ex.getMessage}"
}
// Java API
import akka.pattern.Patterns
val javaFuture = Patterns.ask(actor, "ping", timeout)Pipeline Future results to actors for asynchronous response handling.
/**
* Pipe pattern support trait
*/
trait PipeToSupport {
/**
* Implicit class adding pipe capability to Future
*/
implicit class PipeableFuture[T](val future: Future[T]) extends AnyVal {
/**
* Send Future result to actor when complete
* @param recipient - Target actor
* @param sender - Implicit sender reference
* @return Original Future
*/
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[T]
/**
* Send Future result to actor selection when complete
* @param recipient - Target actor selection
* @param sender - Implicit sender reference
* @return Original Future
*/
def pipeTo(recipient: ActorSelection)(implicit sender: ActorRef = Actor.noSender): Future[T]
}
}
/**
* Java API for pipe pattern
*/
object Patterns {
/**
* Pipe CompletionStage result to actor
* @param future - CompletionStage to pipe
* @param recipient - Target actor
* @param context - Execution context
* @return Original CompletionStage
*/
def pipe[T](
future: CompletionStage[T],
recipient: ActorRef
)(implicit context: ExecutionContext): CompletionStage[T]
}Usage Examples:
import akka.pattern.pipe
import scala.concurrent.Future
class PipeActor extends Actor {
import context.dispatcher
def receive = {
case "start" =>
// Start async operation and pipe result back to self
val futureResult: Future[String] = asyncOperation()
futureResult.pipeTo(self)
case result: String =>
println(s"Received async result: $result")
case Status.Failure(ex) =>
println(s"Async operation failed: $ex")
}
private def asyncOperation(): Future[String] = {
Future {
Thread.sleep(1000)
"Async operation completed"
}
}
}
// Pipe to different actor
class RequestActor extends Actor {
import context.dispatcher
def receive = {
case "process" =>
val originalSender = sender()
val futureResult = expensiveComputation()
futureResult.pipeTo(originalSender)
}
private def expensiveComputation(): Future[Int] = {
Future(42)
}
}Fault tolerance pattern preventing cascading failures in distributed systems.
/**
* Circuit breaker implementation for fault tolerance
*/
class CircuitBreaker(
scheduler: Scheduler,
maxFailures: Int,
callTimeout: Duration,
resetTimeout: Duration
) {
/**
* Execute operation with circuit breaker protection
* @param body - Async operation to protect
* @return Future[T] with circuit breaker semantics
*/
def withCircuitBreaker[T](body: => Future[T]): Future[T]
/**
* Execute synchronous operation with circuit breaker
* @param body - Sync operation to protect
* @return T with circuit breaker semantics
*/
def withSyncCircuitBreaker[T](body: => T): T
/**
* Java API - execute with circuit breaker
* @param body - Callable returning CompletionStage
* @return CompletionStage[T] with protection
*/
def callWithCircuitBreakerCS[T](body: Callable[CompletionStage[T]]): CompletionStage[T]
/**
* Register callback for circuit open state
* @param callback - Code to execute when circuit opens
* @return This CircuitBreaker for chaining
*/
def onOpen(callback: => Unit): CircuitBreaker
/**
* Register callback for circuit closed state
* @param callback - Code to execute when circuit closes
* @return This CircuitBreaker for chaining
*/
def onClose(callback: => Unit): CircuitBreaker
/**
* Register callback for circuit half-open state
* @param callback - Code to execute when circuit half-opens
* @return This CircuitBreaker for chaining
*/
def onHalfOpen(callback: => Unit): CircuitBreaker
/**
* Current failure count
*/
def currentFailureCount: Int
/**
* Whether circuit is currently open
*/
def isOpen: Boolean
/**
* Whether circuit is currently half-open
*/
def isHalfOpen: Boolean
/**
* Whether circuit is currently closed
*/
def isClosed: Boolean
}Usage Examples:
import akka.pattern.CircuitBreaker
import scala.concurrent.duration._
class ServiceActor extends Actor {
import context.dispatcher
// Configure circuit breaker
val breaker = new CircuitBreaker(
scheduler = context.system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute
).onOpen {
println("Circuit breaker opened")
}.onClose {
println("Circuit breaker closed")
}.onHalfOpen {
println("Circuit breaker half-open")
}
def receive = {
case "call-service" =>
val originalSender = sender()
// Protect external service call
val protectedCall = breaker.withCircuitBreaker {
callExternalService()
}
protectedCall.pipeTo(originalSender)
case "sync-call" =>
try {
val result = breaker.withSyncCircuitBreaker {
expensiveBlockingOperation()
}
sender() ! result
} catch {
case _: CircuitBreakerOpenException =>
sender() ! "Service unavailable - circuit open"
}
}
private def callExternalService(): Future[String] = {
// Simulate external service call that might fail
Future {
if (math.random() < 0.3) throw new RuntimeException("Service error")
"Service response"
}
}
private def expensiveBlockingOperation(): String = {
Thread.sleep(1000)
"Blocking result"
}
}Controlled actor shutdown with optional stop message and timeout.
/**
* Graceful stop support
*/
trait GracefulStopSupport {
/**
* Stop actor gracefully with timeout
* @param actor - Actor to stop
* @param timeout - Maximum wait time
* @param stopMessage - Optional custom stop message
* @return Future[Boolean] - true if stopped within timeout
*/
def gracefulStop(
actor: ActorRef,
timeout: Duration,
stopMessage: Any = PoisonPill
): Future[Boolean]
}
/**
* Java API for graceful stop
*/
object Patterns {
/**
* Java graceful stop method
* @param actor - Actor to stop
* @param duration - Timeout duration
* @param stopMessage - Stop message to send
* @return CompletionStage[Boolean] indicating success
*/
def gracefulStop(
actor: ActorRef,
duration: Duration,
stopMessage: AnyRef
): CompletionStage[java.lang.Boolean]
}Usage Examples:
import akka.pattern.gracefulStop
import scala.concurrent.duration._
class GracefulActor extends Actor {
def receive = {
case "shutdown" =>
// Custom shutdown logic
println("Performing cleanup...")
context.stop(self)
case PoisonPill =>
// Handle poison pill
println("Graceful shutdown requested")
context.stop(self)
case msg =>
println(s"Processing: $msg")
}
}
// Using graceful stop
val actor = system.actorOf(Props[GracefulActor])
// Stop with default PoisonPill
val stopped1: Future[Boolean] = gracefulStop(actor, 5.seconds)
stopped1.foreach { success =>
if (success) println("Actor stopped gracefully")
else println("Actor stop timed out")
}
// Stop with custom message
val stopped2: Future[Boolean] = gracefulStop(actor, 3.seconds, "shutdown")Timeout value wrapper for ask pattern and other timed operations.
/**
* Timeout value wrapper
*/
case class Timeout(duration: Duration) {
/**
* Create timeout from length and time unit
* @param length - Timeout length
* @param unit - Time unit
* @return New Timeout
*/
def apply(length: Long, unit: TimeUnit): Timeout = Timeout(Duration(length, unit))
/**
* Convert to Java Duration
*/
def asJava: java.time.Duration = duration.asJava
}
object Timeout {
/**
* Create timeout from duration
* @param duration - Timeout duration
* @return Timeout instance
*/
def apply(duration: Duration): Timeout = new Timeout(duration)
/**
* Create timeout from length and unit
* @param length - Timeout length
* @param unit - Time unit
* @return Timeout instance
*/
def apply(length: Long, unit: TimeUnit): Timeout =
new Timeout(Duration(length, unit))
/**
* Create timeout from Java duration
* @param duration - Java Duration
* @return Timeout instance
*/
def create(duration: java.time.Duration): Timeout =
new Timeout(Duration.fromNanos(duration.toNanos))
/**
* Implicit conversion from Duration
* @param duration - Duration to convert
* @return Timeout instance
*/
implicit def durationToTimeout(duration: Duration): Timeout =
new Timeout(duration)
}Usage Examples:
import akka.util.Timeout
import scala.concurrent.duration._
// Creating timeouts
implicit val timeout1 = Timeout(5.seconds)
implicit val timeout2 = Timeout(3000, TimeUnit.MILLISECONDS)
implicit val timeout3 = Timeout.create(java.time.Duration.ofSeconds(10))
// Using with ask pattern
val future = actor ? "request" // Uses implicit timeout
// Explicit timeout
val explicitFuture = actor.ask("request")(Timeout(2.seconds))
// Java API
val javaTimeout = Timeout.create(java.time.Duration.ofMinutes(1))Utilities for retrying failed operations with configurable strategies.
/**
* Retry support utilities
*/
trait RetrySupport {
/**
* Retry async operation with backoff
* @param operation - Operation to retry
* @param maxRetries - Maximum retry attempts
* @param initialDelay - Initial delay between retries
* @param backoffFactor - Exponential backoff factor
* @param maxDelay - Maximum delay between retries
* @return Future[T] with retry logic
*/
def retry[T](
operation: () => Future[T],
maxRetries: Int,
initialDelay: Duration,
backoffFactor: Double = 2.0,
maxDelay: Duration = Duration.Inf
)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]
}/**
* Exception thrown when ask times out
*/
class AskTimeoutException(message: String) extends TimeoutException(message) {
def this(sender: ActorRef, recipient: ActorRef, timeout: Duration) =
this(s"Ask timed out on [$recipient] after [$timeout]. Message from [$sender]")
}
/**
* Exception thrown when circuit breaker is open
*/
class CircuitBreakerOpenException(message: String) extends RuntimeException(message)
/**
* Status messages for operation results
*/
object Status {
/**
* Success status wrapper
*/
final case class Success(status: Any) extends Status
/**
* Failure status wrapper
*/
final case class Failure(cause: Throwable) extends Status
sealed trait Status extends Serializable
}
/**
* Future timeout support
*/
trait FutureTimeoutSupport {
/**
* Add timeout to Future
* @param future - Future to timeout
* @param timeout - Timeout duration
* @return Future[T] that fails with TimeoutException if not completed in time
*/
def after[T](future: Future[T], timeout: Duration)
(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]
}