or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-testing.mdconfiguration.mdindex.mdlogging.mdsync-testing.mdtest-probes.mdtime-control.md
tile.json

test-probes.mddocs/

Test Probes

Flexible message testing with timeouts, selective filtering, and assertion utilities for asynchronous actor testing. TestProbe acts as a controllable actor that can be queried for received messages.

Capabilities

TestProbe

Main API for asynchronous message testing and assertion with flexible timeout and filtering capabilities.

/**
 * Factory methods for creating TestProbe instances
 */
object TestProbe {
  /** Create anonymous test probe */
  def apply[M]()(implicit system: ActorSystem[_]): TestProbe[M]
  /** Create named test probe */
  def apply[M](name: String)(implicit system: ActorSystem[_]): TestProbe[M]
}

/**
 * Test probe for asynchronous message testing and assertion
 */
trait TestProbe[M] {
  /** ActorRef for this TestProbe */
  def ref: ActorRef[M]
  
  /** Time remaining for execution of innermost enclosing within block or default */
  def remainingOrDefault: FiniteDuration
  /** Time remaining for execution of innermost enclosing within block */
  def remaining: FiniteDuration
  /** Time remaining or specified duration if no within block */
  def remainingOr(duration: FiniteDuration): FiniteDuration
  
  /** Execute code block while bounding execution time between min and max */
  def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
  /** Execute code block within max time bound */
  def within[T](max: FiniteDuration)(f: => T): T
  
  /** Expect specific message with default timeout */
  def expectMessage[T <: M](obj: T): T
  /** Expect specific message with custom timeout */
  def expectMessage[T <: M](max: FiniteDuration, obj: T): T
  /** Expect specific message with custom timeout and hint */
  def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T
  
  /** Assert no message received for specified time */
  def expectNoMessage(max: FiniteDuration): Unit
  /** Assert no message received for default period */
  def expectNoMessage(): Unit
  
  /** Expect message of specific type with default timeout */
  def expectMessageType[T <: M](implicit t: ClassTag[T]): T
  /** Expect message of specific type with custom timeout */
  def expectMessageType[T <: M](max: FiniteDuration)(implicit t: ClassTag[T]): T
  
  /** Receive any message with default timeout */
  def receiveMessage(): M
  /** Receive any message with custom timeout */
  def receiveMessage(max: FiniteDuration): M
  
  /** Receive multiple messages with default timeout */
  def receiveMessages(n: Int): immutable.Seq[M]
  /** Receive multiple messages with custom timeout */
  def receiveMessages(n: Int, max: FiniteDuration): immutable.Seq[M]
  
  /** Selective message collection with fishing function */
  def fishForMessage(max: FiniteDuration)(fisher: M => FishingOutcome): immutable.Seq[M]
  /** Selective message collection with hint */
  def fishForMessage(max: FiniteDuration, hint: String)(fisher: M => FishingOutcome): immutable.Seq[M]
  /** Selective message collection with partial function */
  def fishForMessagePF(max: FiniteDuration)(fisher: PartialFunction[M, FishingOutcome]): immutable.Seq[M]
  /** Selective message collection with partial function and hint */
  def fishForMessagePF(max: FiniteDuration, hint: String)(fisher: PartialFunction[M, FishingOutcome]): immutable.Seq[M]
  
  /** Expect actor termination with default timeout */
  def expectTerminated[U](actorRef: ActorRef[U]): Unit
  /** Expect actor termination with custom timeout */
  def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit
  
  /** Retry assertion until success with default settings */
  def awaitAssert[A](a: => A): A
  /** Retry assertion until success with custom timeout */
  def awaitAssert[A](a: => A, max: FiniteDuration): A
  /** Retry assertion until success with custom timeout and interval */
  def awaitAssert[A](a: => A, max: FiniteDuration, interval: FiniteDuration): A
  
  /** Stop the test probe */
  def stop(): Unit
}

Usage Examples:

import akka.actor.testkit.typed.scaladsl.{ActorTestKit, TestProbe}
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._

val testKit = ActorTestKit()

// Create probe
val probe = testKit.createTestProbe[String]()

// Create behavior that sends messages
val senderBehavior = Behaviors.receiveMessage[String] { msg =>
  probe.ref ! s"Echo: $msg"
  Behaviors.same
}

val sender = testKit.spawn(senderBehavior)

// Test basic message expectation
sender ! "Hello"
probe.expectMessage("Echo: Hello")

// Test with timeout
probe.within(1.second) {
  sender ! "World"
  probe.expectMessage("Echo: World")
}

// Test no message
probe.expectNoMessage(100.millis)

testKit.shutdownTestKit()

Message Expectation and Assertion

Core functionality for expecting and asserting on specific messages with various matching strategies.

/**
 * Expect specific message with default timeout
 * @param obj Expected message object
 * @returns The received message if it matches
 */
def expectMessage[T <: M](obj: T): T

/**
 * Expect specific message with custom timeout
 * @param max Maximum wait time
 * @param obj Expected message object
 * @returns The received message if it matches
 */
def expectMessage[T <: M](max: FiniteDuration, obj: T): T

/**
 * Expect message of specific type using ClassTag
 * @tparam T Expected message type
 * @returns The received message cast to the expected type
 */
def expectMessageType[T <: M](implicit t: ClassTag[T]): T

/**
 * Expect message of specific type with custom timeout
 * @param max Maximum wait time
 * @tparam T Expected message type
 * @returns The received message cast to the expected type
 */
def expectMessageType[T <: M](max: FiniteDuration)(implicit t: ClassTag[T]): T

/**
 * Assert that no messages are received within the specified time
 * @param max Time to wait for messages (not dilated)
 */
def expectNoMessage(max: FiniteDuration): Unit

/**
 * Assert that no messages are received for the default period
 */
def expectNoMessage(): Unit

Usage Examples:

import akka.actor.testkit.typed.scaladsl.TestProbe

val probe = testKit.createTestProbe[Any]()

// Expect specific string message
someActor ! "request"
probe.expectMessage("response")

// Expect message type
case class Response(data: String)
someActor ! "get-data"  
val response = probe.expectMessageType[Response]
assert(response.data == "some data")

// Assert no messages for 100ms
probe.expectNoMessage(100.millis)

Message Collection and Filtering

Advanced message collection with selective filtering using fishing patterns for complex message scenarios.

/**
 * Receive any message of type M with default timeout
 * @returns The received message
 */
def receiveMessage(): M

/**
 * Receive multiple messages in sequence
 * @param n Number of messages to receive
 * @returns Sequence of received messages
 */
def receiveMessages(n: Int): immutable.Seq[M]

/**
 * Selective message collection using fishing function
 * @param max Maximum time to fish for messages
 * @param fisher Function returning FishingOutcome for each message
 * @returns Sequence of collected messages
 */
def fishForMessage(max: FiniteDuration)(fisher: M => FishingOutcome): immutable.Seq[M]

/**
 * Selective message collection using partial function
 * @param max Maximum time to fish for messages  
 * @param fisher Partial function returning FishingOutcome
 * @returns Sequence of collected messages
 */
def fishForMessagePF(max: FiniteDuration)(fisher: PartialFunction[M, FishingOutcome]): immutable.Seq[M]

Usage Examples:

import akka.actor.testkit.typed.scaladsl.{TestProbe, FishingOutcomes}

val probe = testKit.createTestProbe[String]()

// Send multiple messages to actor that will respond
actor ! "msg1"
actor ! "msg2" 
actor ! "done"

// Collect all messages until "done"
val messages = probe.fishForMessage(3.seconds) {
  case "done" => FishingOutcomes.complete
  case msg => FishingOutcomes.continue
}

// Collect only specific messages
val filtered = probe.fishForMessagePF(3.seconds) {
  case msg if msg.startsWith("important") => FishingOutcomes.continue
  case "stop" => FishingOutcomes.complete
  case _ => FishingOutcomes.continueAndIgnore // Don't collect this message
}

FishingOutcomes

Control values for directing fishing behavior during selective message collection.

/**
 * Factory for FishingOutcome values
 */
object FishingOutcomes {
  /** Complete fishing and return all collected messages */
  val complete: FishingOutcome
  /** Continue fishing and collect this message */
  val continue: FishingOutcome  
  /** Continue fishing but don't collect this message */
  val continueAndIgnore: FishingOutcome
  /** Fail fishing with custom error message */
  def fail(message: String): FishingOutcome
}

sealed trait FishingOutcome
object FishingOutcome {
  case object Complete extends FishingOutcome
  case object Continue extends FishingOutcome
  case object ContinueAndIgnore extends FishingOutcome
  case class Fail(message: String) extends FishingOutcome
}

Time Management and Bounds

Utilities for controlling test execution timing and setting time bounds for operations.

/**
 * Execute code block while bounding execution time between min and max
 * @param min Minimum execution time (not dilated)
 * @param max Maximum execution time (dilated)
 * @param f Code block to execute
 * @returns Result of code block execution
 */
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T

/**
 * Execute code block within maximum time bound
 * @param max Maximum execution time (dilated)
 * @param f Code block to execute
 * @returns Result of code block execution
 */
def within[T](max: FiniteDuration)(f: => T): T

/**
 * Get time remaining for execution of innermost enclosing within block
 * @returns Remaining time or default if no within block
 */
def remainingOrDefault: FiniteDuration

/**
 * Get time remaining for execution of innermost enclosing within block
 * @returns Remaining time (throws if no within block)
 */
def remaining: FiniteDuration

/**
 * Get time remaining or specified duration if no within block
 * @param duration Fallback duration
 * @returns Remaining time or fallback duration
 */
def remainingOr(duration: FiniteDuration): FiniteDuration

Actor Termination and Retry Utilities

Utilities for testing actor lifecycle and implementing retry logic for eventually consistent assertions.

/**
 * Expect actor to terminate within default timeout
 * @param actorRef Actor to watch for termination
 */
def expectTerminated[U](actorRef: ActorRef[U]): Unit

/**
 * Expect actor to terminate within custom timeout
 * @param actorRef Actor to watch for termination
 * @param max Maximum time to wait for termination
 */
def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit

/**
 * Retry assertion until success with default timeout and interval
 * @param a Assertion to retry
 * @returns Result of successful assertion
 */
def awaitAssert[A](a: => A): A

/**
 * Retry assertion until success with custom timeout
 * @param a Assertion to retry
 * @param max Maximum time to retry
 * @returns Result of successful assertion
 */
def awaitAssert[A](a: => A, max: FiniteDuration): A

/**
 * Retry assertion until success with custom timeout and interval
 * @param a Assertion to retry
 * @param max Maximum time to retry
 * @param interval Time between retry attempts
 * @returns Result of successful assertion
 */
def awaitAssert[A](a: => A, max: FiniteDuration, interval: FiniteDuration): A

/**
 * Stop the test probe (useful for testing watch/termination)
 */
def stop(): Unit

Usage Examples:

import akka.actor.testkit.typed.scaladsl.TestProbe

val probe = testKit.createTestProbe[String]()

// Test actor termination
val actor = testKit.spawn(someBehavior)
actor ! PoisonPill
probe.expectTerminated(actor, 3.seconds)

// Retry assertion for eventually consistent behavior  
probe.awaitAssert {
  // This assertion will be retried until it succeeds or timeout
  val state = getEventualState()
  assert(state.isReady)
}, max = 5.seconds, interval = 100.millis

// Stop probe for termination testing
probe.stop()
someWatchingActor ! probe.ref // Will receive Terminated message