Akka Actor provides the foundational Actor Model implementation for building concurrent, distributed, resilient and elastic applications with supervision hierarchies and location transparency.
Essential patterns and utilities for Akka Actor including ask pattern, circuit breaker, graceful shutdown, and timeout handling.
Request-response pattern for obtaining Future-based replies from actors.
/**
* Ask pattern support trait
*/
trait AskSupport {
/**
* Send message and return Future of response
* @param actorRef - Target actor
* @param message - Message to send
* @param timeout - Request timeout
* @return Future[Any] containing response
*/
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any]
}
/**
* Implicit class adding ask capability to ActorRef
*/
final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
/**
* Ask operator - send message and return Future
* @param message - Message to send
* @param timeout - Implicit timeout
* @param sender - Optional sender (defaults to Actor.noSender)
* @return Future[Any] containing response
*/
def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any]
/**
* Ask method - send message and return Future
* @param message - Message to send
* @param timeout - Implicit timeout
* @param sender - Optional sender (defaults to Actor.noSender)
* @return Future[Any] containing response
*/
def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any]
}
/**
* Java API for ask pattern
*/
object Patterns {
/**
* Java ask method returning CompletionStage
* @param actor - Target actor
* @param message - Message to send
* @param timeout - Request timeout
* @return CompletionStage[AnyRef] containing response
*/
def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef]
/**
* Java ask with duration timeout
* @param actor - Target actor
* @param message - Message to send
* @param timeout - Request timeout duration (java.time.Duration)
* @return CompletionStage[AnyRef] containing response
*/
def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef]
}Usage Examples:
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.Future
// Scala ask pattern
implicit val timeout = Timeout(5.seconds)
val system = ActorSystem("AskExample")
class ResponseActor extends Actor {
def receive = {
case "ping" => sender() ! "pong"
case x: Int => sender() ! (x * 2)
case "fail" => throw new RuntimeException("Ask failure")
}
}
val actor = system.actorOf(Props[ResponseActor])
// Using ? operator
val future1: Future[Any] = actor ? "ping"
future1.foreach(println) // Prints: pong
// Using ask method
val future2: Future[Any] = ask(actor, 42)
future2.foreach(println) // Prints: 84
// Type casting response
val future3: Future[String] = (actor ? "ping").mapTo[String]
// Error handling
val future4: Future[Any] = actor ? "fail"
future4.recover {
case _: AskTimeoutException => "Request timed out"
case ex => s"Request failed: ${ex.getMessage}"
}
// Java API
import akka.pattern.Patterns
val javaFuture = Patterns.ask(actor, "ping", timeout)Pipeline Future results to actors for asynchronous response handling.
/**
* Pipe pattern support trait
*/
trait PipeToSupport {
/**
* Implicit class adding pipe capability to Future
*/
implicit class PipeableFuture[T](val future: Future[T]) extends AnyVal {
/**
* Send Future result to actor when complete
* @param recipient - Target actor
* @param sender - Implicit sender reference
* @return Original Future
*/
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[T]
/**
* Send Future result to actor selection when complete
* @param recipient - Target actor selection
* @param sender - Implicit sender reference
* @return Original Future
*/
def pipeTo(recipient: ActorSelection)(implicit sender: ActorRef = Actor.noSender): Future[T]
}
}
/**
* Java API for pipe pattern
*/
object Patterns {
/**
* Pipe CompletionStage result to actor
* @param future - CompletionStage to pipe
* @param recipient - Target actor
* @param context - Execution context
* @return Original CompletionStage
*/
def pipe[T](
future: CompletionStage[T],
recipient: ActorRef
)(implicit context: ExecutionContext): CompletionStage[T]
}Usage Examples:
import akka.pattern.pipe
import scala.concurrent.Future
class PipeActor extends Actor {
import context.dispatcher
def receive = {
case "start" =>
// Start async operation and pipe result back to self
val futureResult: Future[String] = asyncOperation()
futureResult.pipeTo(self)
case result: String =>
println(s"Received async result: $result")
case Status.Failure(ex) =>
println(s"Async operation failed: $ex")
}
private def asyncOperation(): Future[String] = {
Future {
Thread.sleep(1000)
"Async operation completed"
}
}
}
// Pipe to different actor
class RequestActor extends Actor {
import context.dispatcher
def receive = {
case "process" =>
val originalSender = sender()
val futureResult = expensiveComputation()
futureResult.pipeTo(originalSender)
}
private def expensiveComputation(): Future[Int] = {
Future(42)
}
}Fault tolerance pattern preventing cascading failures in distributed systems.
/**
* Circuit breaker implementation for fault tolerance
*/
class CircuitBreaker(
scheduler: Scheduler,
maxFailures: Int,
callTimeout: Duration,
resetTimeout: Duration
) {
/**
* Execute operation with circuit breaker protection
* @param body - Async operation to protect
* @return Future[T] with circuit breaker semantics
*/
def withCircuitBreaker[T](body: => Future[T]): Future[T]
/**
* Execute synchronous operation with circuit breaker
* @param body - Sync operation to protect
* @return T with circuit breaker semantics
*/
def withSyncCircuitBreaker[T](body: => T): T
/**
* Java API - execute with circuit breaker
* @param body - Callable returning CompletionStage
* @return CompletionStage[T] with protection
*/
def callWithCircuitBreakerCS[T](body: Callable[CompletionStage[T]]): CompletionStage[T]
/**
* Register callback for circuit open state
* @param callback - Code to execute when circuit opens
* @return This CircuitBreaker for chaining
*/
def onOpen(callback: => Unit): CircuitBreaker
/**
* Register callback for circuit closed state
* @param callback - Code to execute when circuit closes
* @return This CircuitBreaker for chaining
*/
def onClose(callback: => Unit): CircuitBreaker
/**
* Register callback for circuit half-open state
* @param callback - Code to execute when circuit half-opens
* @return This CircuitBreaker for chaining
*/
def onHalfOpen(callback: => Unit): CircuitBreaker
/**
* Current failure count
*/
def currentFailureCount: Int
/**
* Whether circuit is currently open
*/
def isOpen: Boolean
/**
* Whether circuit is currently half-open
*/
def isHalfOpen: Boolean
/**
* Whether circuit is currently closed
*/
def isClosed: Boolean
}Usage Examples:
import akka.pattern.CircuitBreaker
import scala.concurrent.duration._
class ServiceActor extends Actor {
import context.dispatcher
// Configure circuit breaker
val breaker = new CircuitBreaker(
scheduler = context.system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute
).onOpen {
println("Circuit breaker opened")
}.onClose {
println("Circuit breaker closed")
}.onHalfOpen {
println("Circuit breaker half-open")
}
def receive = {
case "call-service" =>
val originalSender = sender()
// Protect external service call
val protectedCall = breaker.withCircuitBreaker {
callExternalService()
}
protectedCall.pipeTo(originalSender)
case "sync-call" =>
try {
val result = breaker.withSyncCircuitBreaker {
expensiveBlockingOperation()
}
sender() ! result
} catch {
case _: CircuitBreakerOpenException =>
sender() ! "Service unavailable - circuit open"
}
}
private def callExternalService(): Future[String] = {
// Simulate external service call that might fail
Future {
if (math.random() < 0.3) throw new RuntimeException("Service error")
"Service response"
}
}
private def expensiveBlockingOperation(): String = {
Thread.sleep(1000)
"Blocking result"
}
}Controlled actor shutdown with optional stop message and timeout.
/**
* Graceful stop support
*/
trait GracefulStopSupport {
/**
* Stop actor gracefully with timeout
* @param actor - Actor to stop
* @param timeout - Maximum wait time
* @param stopMessage - Optional custom stop message
* @return Future[Boolean] - true if stopped within timeout
*/
def gracefulStop(
actor: ActorRef,
timeout: Duration,
stopMessage: Any = PoisonPill
): Future[Boolean]
}
/**
* Java API for graceful stop
*/
object Patterns {
/**
* Java graceful stop method
* @param actor - Actor to stop
* @param duration - Timeout duration
* @param stopMessage - Stop message to send
* @return CompletionStage[Boolean] indicating success
*/
def gracefulStop(
actor: ActorRef,
duration: Duration,
stopMessage: AnyRef
): CompletionStage[java.lang.Boolean]
}Usage Examples:
import akka.pattern.gracefulStop
import scala.concurrent.duration._
class GracefulActor extends Actor {
def receive = {
case "shutdown" =>
// Custom shutdown logic
println("Performing cleanup...")
context.stop(self)
case PoisonPill =>
// Handle poison pill
println("Graceful shutdown requested")
context.stop(self)
case msg =>
println(s"Processing: $msg")
}
}
// Using graceful stop
val actor = system.actorOf(Props[GracefulActor])
// Stop with default PoisonPill
val stopped1: Future[Boolean] = gracefulStop(actor, 5.seconds)
stopped1.foreach { success =>
if (success) println("Actor stopped gracefully")
else println("Actor stop timed out")
}
// Stop with custom message
val stopped2: Future[Boolean] = gracefulStop(actor, 3.seconds, "shutdown")Timeout value wrapper for ask pattern and other timed operations.
/**
* Timeout value wrapper
*/
case class Timeout(duration: Duration) {
/**
* Create timeout from length and time unit
* @param length - Timeout length
* @param unit - Time unit
* @return New Timeout
*/
def apply(length: Long, unit: TimeUnit): Timeout = Timeout(Duration(length, unit))
/**
* Convert to Java Duration
*/
def asJava: java.time.Duration = duration.asJava
}
object Timeout {
/**
* Create timeout from duration
* @param duration - Timeout duration
* @return Timeout instance
*/
def apply(duration: Duration): Timeout = new Timeout(duration)
/**
* Create timeout from length and unit
* @param length - Timeout length
* @param unit - Time unit
* @return Timeout instance
*/
def apply(length: Long, unit: TimeUnit): Timeout =
new Timeout(Duration(length, unit))
/**
* Create timeout from Java duration
* @param duration - Java Duration
* @return Timeout instance
*/
def create(duration: java.time.Duration): Timeout =
new Timeout(Duration.fromNanos(duration.toNanos))
/**
* Implicit conversion from Duration
* @param duration - Duration to convert
* @return Timeout instance
*/
implicit def durationToTimeout(duration: Duration): Timeout =
new Timeout(duration)
}Usage Examples:
import akka.util.Timeout
import scala.concurrent.duration._
// Creating timeouts
implicit val timeout1 = Timeout(5.seconds)
implicit val timeout2 = Timeout(3000, TimeUnit.MILLISECONDS)
implicit val timeout3 = Timeout.create(java.time.Duration.ofSeconds(10))
// Using with ask pattern
val future = actor ? "request" // Uses implicit timeout
// Explicit timeout
val explicitFuture = actor.ask("request")(Timeout(2.seconds))
// Java API
val javaTimeout = Timeout.create(java.time.Duration.ofMinutes(1))Utilities for retrying failed operations with configurable strategies.
/**
* Retry support utilities
*/
trait RetrySupport {
/**
* Retry async operation with backoff
* @param operation - Operation to retry
* @param maxRetries - Maximum retry attempts
* @param initialDelay - Initial delay between retries
* @param backoffFactor - Exponential backoff factor
* @param maxDelay - Maximum delay between retries
* @return Future[T] with retry logic
*/
def retry[T](
operation: () => Future[T],
maxRetries: Int,
initialDelay: Duration,
backoffFactor: Double = 2.0,
maxDelay: Duration = Duration.Inf
)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]
}/**
* Exception thrown when ask times out
*/
class AskTimeoutException(message: String) extends TimeoutException(message) {
def this(sender: ActorRef, recipient: ActorRef, timeout: Duration) =
this(s"Ask timed out on [$recipient] after [$timeout]. Message from [$sender]")
}
/**
* Exception thrown when circuit breaker is open
*/
class CircuitBreakerOpenException(message: String) extends RuntimeException(message)
/**
* Status messages for operation results
*/
object Status {
/**
* Success status wrapper
*/
final case class Success(status: Any) extends Status
/**
* Failure status wrapper
*/
final case class Failure(cause: Throwable) extends Status
sealed trait Status extends Serializable
}
/**
* Future timeout support
*/
trait FutureTimeoutSupport {
/**
* Add timeout to Future
* @param future - Future to timeout
* @param timeout - Timeout duration
* @return Future[T] that fails with TimeoutException if not completed in time
*/
def after[T](future: Future[T], timeout: Duration)
(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-actor-2-11