or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

actor-behavior.mdactor-lifecycle.mdactor-system.mdevents-logging.mdindex.mdio.mdmessaging.mdrouting.mdsupervision.mdutilities.md
tile.json

messaging.mddocs/

Message Passing and Communication

Core message passing primitives, the ask pattern, and various messaging patterns for actor communication. This covers synchronous and asynchronous communication, message forwarding, and advanced interaction patterns.

Capabilities

Basic Message Sending

Core tell (!) operation for asynchronous message passing between actors.

/**
 * Basic message sending operations on ActorRef
 */
abstract class ActorRef {
  /** Send a message asynchronously (Java API) */
  def tell(msg: Any, sender: ActorRef): Unit
  
  /** Send a message asynchronously (Scala operator) */
  def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit
  
  /** Forward a message preserving original sender */
  def forward(message: Any)(implicit context: ActorContext): Unit
  
  /** Compare actor references */
  def compareTo(other: ActorRef): Int
  
  /** Get the actor path */
  def path: ActorPath
}

object Actor {
  /** Special reference representing no sender */
  val noSender: ActorRef
}

Usage Examples:

import akka.actor.{Actor, ActorRef}

class MessageSender extends Actor {
  def receive = {
    case target: ActorRef =>
      // Send message with implicit sender (this actor)
      target ! "Hello from sender"
      
      // Send message with explicit sender
      target.tell("Hello explicit", self)
      
      // Send message without sender
      target.tell("Anonymous message", Actor.noSender)
      
    case "forward" =>
      val target = context.actorSelection("/user/target")
      target.forward("Forwarded message") // Preserves original sender
  }
}

class MessageReceiver extends Actor {
  def receive = {
    case msg: String =>
      println(s"Received: $msg from ${sender()}")
      sender() ! s"Reply to: $msg" // Reply to sender
  }
}

Ask Pattern (Request-Response)

The ask pattern enables request-response communication returning Futures for replies.

/**
 * Ask pattern support from akka.pattern
 */
object AskSupport {
  /** Ask an actor and get a Future response */
  implicit class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
    def ask(message: Any)(implicit timeout: Timeout): Future[Any] = ???
    def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)
  }
}

/**
 * Timeout wrapper for ask operations
 */
final case class Timeout(duration: FiniteDuration) {
  def *(factor: Double): Timeout
  def +(other: Timeout): Timeout  
  def min(other: Timeout): Timeout
  def max(other: Timeout): Timeout
}

object Timeout {
  /** Implicit conversion from duration to timeout */
  implicit def durationToTimeout(duration: FiniteDuration): Timeout = Timeout(duration)
}

/**
 * Exception thrown when ask times out
 */
final case class AskTimeoutException(message: String) extends AkkaException(message)

Usage Examples:

import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.Future

class CalculatorActor extends Actor {
  def receive = {
    case ("add", x: Int, y: Int) => sender() ! (x + y)
    case ("multiply", x: Int, y: Int) => sender() ! (x * y)
    case ("divide", x: Int, y: Int) if y != 0 => sender() ! (x / y)
    case ("divide", _, 0) => sender() ! akka.actor.Status.Failure(new ArithmeticException("Division by zero"))
  }
}

// Using ask pattern
val system = ActorSystem("AskExample")
val calculator = system.actorOf(Props[CalculatorActor](), "calculator")

implicit val timeout: Timeout = 5.seconds
import system.dispatcher

// Ask for a result
val resultFuture: Future[Any] = calculator ? ("add", 10, 20)
resultFuture.foreach { result =>
  println(s"Result: $result") // Result: 30
}

// Handle typed responses  
val divisionFuture = calculator ? ("divide", 10, 2)
divisionFuture.foreach {
  case result: Int => println(s"Division result: $result")
  case akka.actor.Status.Failure(ex) => println(s"Error: ${ex.getMessage}")
}

Pipe Pattern

Pipe pattern for sending Future results to actors as messages.

/**
 * Pipe pattern support from akka.pattern
 */
object PipeToSupport {
  /** Pipeable Future operations */
  implicit class PipeableFuture[T](val future: Future[T]) extends AnyVal {
    def pipeTo(recipient: ActorRef)(implicit sender: ActorRef): Future[T]
    def pipeToSelection(recipient: ActorSelection)(implicit sender: ActorRef): Future[T]
  }
}

Usage Examples:

import akka.actor.{Actor, ActorRef}
import akka.pattern.pipe
import scala.concurrent.Future
import scala.util.{Success, Failure}

class DatabaseActor extends Actor {
  import context.dispatcher
  
  def receive = {
    case "fetch-user" =>
      // Simulate database call
      val userFuture: Future[String] = Future {
        Thread.sleep(100)
        "User: John Doe"
      }
      
      // Pipe result back to sender
      userFuture.pipeTo(sender())
      
    case "process-data" =>
      val processingFuture = Future {
        // Simulate heavy computation
        Thread.sleep(200)
        "Processed data"
      }
      
      // Pipe to self for further processing
      processingFuture.pipeTo(self)
      
    case result: String =>
      println(s"Received piped result: $result")
      
    case akka.actor.Status.Failure(ex) =>
      println(s"Received failure: ${ex.getMessage}")
  }
}

Message Patterns and Types

Standard message types and patterns for actor communication.

/**
 * Status wrapper messages
 */
object Status {
  /** Success status with result */
  final case class Success(status: Any)
  
  /** Failure status with exception */  
  final case class Failure(cause: Throwable)
}

/**
 * Identity query and response messages
 */
final case class Identify(messageId: Any) extends AutoReceivedMessage with NotInfluenceReceiveTimeout

final case class ActorIdentity(correlationId: Any, ref: Option[ActorRef])

/**
 * Unhandled message notification
 */
final case class UnhandledMessage(message: Any, sender: ActorRef, recipient: ActorRef)

/**
 * Message traits for special handling
 */
trait AutoReceivedMessage
trait PossiblyHarmful  
trait NotInfluenceReceiveTimeout
trait DeadLetterSuppression
trait WrappedMessage

object WrappedMessage {
  /** Unwrap a wrapped message */
  def unwrap(message: Any): Any
}

Usage Examples:

import akka.actor.{Actor, Identify, ActorIdentity}

class IdentityActor extends Actor {
  def receive = {
    case "find-actor" =>
      val selection = context.actorSelection("/user/target")
      selection ! Identify("lookup-id")
      
    case ActorIdentity("lookup-id", Some(ref)) =>
      println(s"Found actor: ${ref.path}")
      ref ! "Hello found actor"
      
    case ActorIdentity("lookup-id", None) =>
      println("Actor not found")
      
    case akka.actor.Status.Success(value) =>
      println(s"Operation succeeded: $value")
      
    case akka.actor.Status.Failure(ex) =>
      println(s"Operation failed: ${ex.getMessage}")
  }
}

Dead Letters

Dead letter handling for undeliverable messages.

/**
 * Dead letter types and handling
 */
trait AllDeadLetters

final case class DeadLetter(
  message: Any,
  sender: ActorRef, 
  recipient: ActorRef
) extends AllDeadLetters

final case class SuppressedDeadLetter(
  message: DeadLetterSuppression,
  sender: ActorRef,
  recipient: ActorRef  
) extends AllDeadLetters

final case class Dropped(
  message: Any,
  reason: String,
  sender: ActorRef,
  recipient: ActorRef
) extends AllDeadLetters

Usage Examples:

import akka.actor.{Actor, DeadLetter, ActorSystem}

class DeadLetterListener extends Actor {
  def receive = {
    case DeadLetter(msg, sender, recipient) =>
      println(s"Dead letter: $msg from $sender to $recipient")
      
    case d: Dropped =>
      println(s"Dropped message: ${d.message}, reason: ${d.reason}")
  }
}

// Subscribe to dead letters
val system = ActorSystem("DeadLetterExample")
val listener = system.actorOf(Props[DeadLetterListener]())
system.eventStream.subscribe(listener, classOf[DeadLetter])
system.eventStream.subscribe(listener, classOf[Dropped])

Request-Response Patterns

Advanced request-response patterns including status replies and correlation.

/**
 * Status reply pattern from akka.pattern  
 */
object StatusReply {
  /** Successful response */
  final case class Success[T](value: T)
  
  /** Error response */
  final case class Error[T](error: Throwable)
  final case class Error(error: String)
  
  /** Create success reply */
  def success[T](value: T): Success[T] = Success(value)
  
  /** Create error reply */
  def error[T](error: Throwable): Error[T] = Error(error)
  def error(errorMessage: String): Error[String] = Error(errorMessage)
}

Usage Examples:

import akka.actor.{Actor, ActorRef}
import akka.pattern.StatusReply

class ServiceActor extends Actor {
  def receive = {
    case ("validate", data: String) =>
      if (data.nonEmpty) {
        sender() ! StatusReply.success("Valid data")
      } else {
        sender() ! StatusReply.error("Data cannot be empty")
      }
      
    case ("process", data: Any) =>
      try {
        val result = processData(data)
        sender() ! StatusReply.success(result)
      } catch {
        case ex: Exception =>
          sender() ! StatusReply.error(ex)
      }
  }
  
  private def processData(data: Any): String = {
    // Simulate processing
    s"Processed: $data"
  }
}

class ClientActor extends Actor {
  def receive = {
    case "start" =>
      val service = context.actorSelection("/user/service")
      service ! ("validate", "test-data")
      
    case StatusReply.Success(value) =>
      println(s"Success: $value")
      
    case StatusReply.Error(error) =>
      println(s"Error: $error")
  }
}

Message Ordering and Delivery

Message ordering guarantees and delivery semantics in Akka.

/**
 * Message delivery guarantees:
 * - At-most-once delivery (default)
 * - Message ordering per sender-receiver pair
 * - No guaranteed delivery without acknowledgment
 */

/**
 * Reliable delivery patterns require explicit acknowledgment
 */
case class Ack(id: Long)
case class Message(id: Long, payload: Any)

/**
 * At-least-once delivery using persistence (akka-persistence)
 */
trait AtLeastOnceDelivery {
  def deliver(destination: ActorPath)(deliveryIdToMessage: Long => Any): Unit
  def confirmDelivery(deliveryId: Long): Boolean
  def getDeliverySnapshot: AtLeastOnceDeliverySnapshot
}

Usage Examples:

import akka.actor.Actor

class ReliableActor extends Actor {
  var nextId = 0L
  var pendingMessages = Map.empty[Long, (ActorRef, Any)]
  
  def receive = {
    case (recipient: ActorRef, payload: Any) =>
      val id = nextId
      nextId += 1
      pendingMessages += id -> (recipient, payload)
      recipient ! Message(id, payload)
      
    case Ack(id) =>
      pendingMessages -= id
      println(s"Message $id acknowledged")
      
    case "retry" =>
      pendingMessages.foreach { case (id, (recipient, payload)) =>
        recipient ! Message(id, payload)
      }
  }
}

class AcknowledgingActor extends Actor {
  def receive = {
    case Message(id, payload) =>
      println(s"Received: $payload")
      sender() ! Ack(id) // Acknowledge receipt
  }
}