Flexible message testing with timeouts, selective filtering, and assertion utilities for asynchronous actor testing. TestProbe acts as a controllable actor that can be queried for received messages.
Main API for asynchronous message testing and assertion with flexible timeout and filtering capabilities.
/**
* Factory methods for creating TestProbe instances
*/
object TestProbe {
/** Create anonymous test probe */
def apply[M]()(implicit system: ActorSystem[_]): TestProbe[M]
/** Create named test probe */
def apply[M](name: String)(implicit system: ActorSystem[_]): TestProbe[M]
}
/**
* Test probe for asynchronous message testing and assertion
*/
trait TestProbe[M] {
/** ActorRef for this TestProbe */
def ref: ActorRef[M]
/** Time remaining for execution of innermost enclosing within block or default */
def remainingOrDefault: FiniteDuration
/** Time remaining for execution of innermost enclosing within block */
def remaining: FiniteDuration
/** Time remaining or specified duration if no within block */
def remainingOr(duration: FiniteDuration): FiniteDuration
/** Execute code block while bounding execution time between min and max */
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
/** Execute code block within max time bound */
def within[T](max: FiniteDuration)(f: => T): T
/** Expect specific message with default timeout */
def expectMessage[T <: M](obj: T): T
/** Expect specific message with custom timeout */
def expectMessage[T <: M](max: FiniteDuration, obj: T): T
/** Expect specific message with custom timeout and hint */
def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T
/** Assert no message received for specified time */
def expectNoMessage(max: FiniteDuration): Unit
/** Assert no message received for default period */
def expectNoMessage(): Unit
/** Expect message of specific type with default timeout */
def expectMessageType[T <: M](implicit t: ClassTag[T]): T
/** Expect message of specific type with custom timeout */
def expectMessageType[T <: M](max: FiniteDuration)(implicit t: ClassTag[T]): T
/** Receive any message with default timeout */
def receiveMessage(): M
/** Receive any message with custom timeout */
def receiveMessage(max: FiniteDuration): M
/** Receive multiple messages with default timeout */
def receiveMessages(n: Int): immutable.Seq[M]
/** Receive multiple messages with custom timeout */
def receiveMessages(n: Int, max: FiniteDuration): immutable.Seq[M]
/** Selective message collection with fishing function */
def fishForMessage(max: FiniteDuration)(fisher: M => FishingOutcome): immutable.Seq[M]
/** Selective message collection with hint */
def fishForMessage(max: FiniteDuration, hint: String)(fisher: M => FishingOutcome): immutable.Seq[M]
/** Selective message collection with partial function */
def fishForMessagePF(max: FiniteDuration)(fisher: PartialFunction[M, FishingOutcome]): immutable.Seq[M]
/** Selective message collection with partial function and hint */
def fishForMessagePF(max: FiniteDuration, hint: String)(fisher: PartialFunction[M, FishingOutcome]): immutable.Seq[M]
/** Expect actor termination with default timeout */
def expectTerminated[U](actorRef: ActorRef[U]): Unit
/** Expect actor termination with custom timeout */
def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit
/** Retry assertion until success with default settings */
def awaitAssert[A](a: => A): A
/** Retry assertion until success with custom timeout */
def awaitAssert[A](a: => A, max: FiniteDuration): A
/** Retry assertion until success with custom timeout and interval */
def awaitAssert[A](a: => A, max: FiniteDuration, interval: FiniteDuration): A
/** Stop the test probe */
def stop(): Unit
}Usage Examples:
import akka.actor.testkit.typed.scaladsl.{ActorTestKit, TestProbe}
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
val testKit = ActorTestKit()
// Create probe
val probe = testKit.createTestProbe[String]()
// Create behavior that sends messages
val senderBehavior = Behaviors.receiveMessage[String] { msg =>
probe.ref ! s"Echo: $msg"
Behaviors.same
}
val sender = testKit.spawn(senderBehavior)
// Test basic message expectation
sender ! "Hello"
probe.expectMessage("Echo: Hello")
// Test with timeout
probe.within(1.second) {
sender ! "World"
probe.expectMessage("Echo: World")
}
// Test no message
probe.expectNoMessage(100.millis)
testKit.shutdownTestKit()Core functionality for expecting and asserting on specific messages with various matching strategies.
/**
* Expect specific message with default timeout
* @param obj Expected message object
* @returns The received message if it matches
*/
def expectMessage[T <: M](obj: T): T
/**
* Expect specific message with custom timeout
* @param max Maximum wait time
* @param obj Expected message object
* @returns The received message if it matches
*/
def expectMessage[T <: M](max: FiniteDuration, obj: T): T
/**
* Expect message of specific type using ClassTag
* @tparam T Expected message type
* @returns The received message cast to the expected type
*/
def expectMessageType[T <: M](implicit t: ClassTag[T]): T
/**
* Expect message of specific type with custom timeout
* @param max Maximum wait time
* @tparam T Expected message type
* @returns The received message cast to the expected type
*/
def expectMessageType[T <: M](max: FiniteDuration)(implicit t: ClassTag[T]): T
/**
* Assert that no messages are received within the specified time
* @param max Time to wait for messages (not dilated)
*/
def expectNoMessage(max: FiniteDuration): Unit
/**
* Assert that no messages are received for the default period
*/
def expectNoMessage(): UnitUsage Examples:
import akka.actor.testkit.typed.scaladsl.TestProbe
val probe = testKit.createTestProbe[Any]()
// Expect specific string message
someActor ! "request"
probe.expectMessage("response")
// Expect message type
case class Response(data: String)
someActor ! "get-data"
val response = probe.expectMessageType[Response]
assert(response.data == "some data")
// Assert no messages for 100ms
probe.expectNoMessage(100.millis)Advanced message collection with selective filtering using fishing patterns for complex message scenarios.
/**
* Receive any message of type M with default timeout
* @returns The received message
*/
def receiveMessage(): M
/**
* Receive multiple messages in sequence
* @param n Number of messages to receive
* @returns Sequence of received messages
*/
def receiveMessages(n: Int): immutable.Seq[M]
/**
* Selective message collection using fishing function
* @param max Maximum time to fish for messages
* @param fisher Function returning FishingOutcome for each message
* @returns Sequence of collected messages
*/
def fishForMessage(max: FiniteDuration)(fisher: M => FishingOutcome): immutable.Seq[M]
/**
* Selective message collection using partial function
* @param max Maximum time to fish for messages
* @param fisher Partial function returning FishingOutcome
* @returns Sequence of collected messages
*/
def fishForMessagePF(max: FiniteDuration)(fisher: PartialFunction[M, FishingOutcome]): immutable.Seq[M]Usage Examples:
import akka.actor.testkit.typed.scaladsl.{TestProbe, FishingOutcomes}
val probe = testKit.createTestProbe[String]()
// Send multiple messages to actor that will respond
actor ! "msg1"
actor ! "msg2"
actor ! "done"
// Collect all messages until "done"
val messages = probe.fishForMessage(3.seconds) {
case "done" => FishingOutcomes.complete
case msg => FishingOutcomes.continue
}
// Collect only specific messages
val filtered = probe.fishForMessagePF(3.seconds) {
case msg if msg.startsWith("important") => FishingOutcomes.continue
case "stop" => FishingOutcomes.complete
case _ => FishingOutcomes.continueAndIgnore // Don't collect this message
}Control values for directing fishing behavior during selective message collection.
/**
* Factory for FishingOutcome values
*/
object FishingOutcomes {
/** Complete fishing and return all collected messages */
val complete: FishingOutcome
/** Continue fishing and collect this message */
val continue: FishingOutcome
/** Continue fishing but don't collect this message */
val continueAndIgnore: FishingOutcome
/** Fail fishing with custom error message */
def fail(message: String): FishingOutcome
}
sealed trait FishingOutcome
object FishingOutcome {
case object Complete extends FishingOutcome
case object Continue extends FishingOutcome
case object ContinueAndIgnore extends FishingOutcome
case class Fail(message: String) extends FishingOutcome
}Utilities for controlling test execution timing and setting time bounds for operations.
/**
* Execute code block while bounding execution time between min and max
* @param min Minimum execution time (not dilated)
* @param max Maximum execution time (dilated)
* @param f Code block to execute
* @returns Result of code block execution
*/
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
/**
* Execute code block within maximum time bound
* @param max Maximum execution time (dilated)
* @param f Code block to execute
* @returns Result of code block execution
*/
def within[T](max: FiniteDuration)(f: => T): T
/**
* Get time remaining for execution of innermost enclosing within block
* @returns Remaining time or default if no within block
*/
def remainingOrDefault: FiniteDuration
/**
* Get time remaining for execution of innermost enclosing within block
* @returns Remaining time (throws if no within block)
*/
def remaining: FiniteDuration
/**
* Get time remaining or specified duration if no within block
* @param duration Fallback duration
* @returns Remaining time or fallback duration
*/
def remainingOr(duration: FiniteDuration): FiniteDurationUtilities for testing actor lifecycle and implementing retry logic for eventually consistent assertions.
/**
* Expect actor to terminate within default timeout
* @param actorRef Actor to watch for termination
*/
def expectTerminated[U](actorRef: ActorRef[U]): Unit
/**
* Expect actor to terminate within custom timeout
* @param actorRef Actor to watch for termination
* @param max Maximum time to wait for termination
*/
def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit
/**
* Retry assertion until success with default timeout and interval
* @param a Assertion to retry
* @returns Result of successful assertion
*/
def awaitAssert[A](a: => A): A
/**
* Retry assertion until success with custom timeout
* @param a Assertion to retry
* @param max Maximum time to retry
* @returns Result of successful assertion
*/
def awaitAssert[A](a: => A, max: FiniteDuration): A
/**
* Retry assertion until success with custom timeout and interval
* @param a Assertion to retry
* @param max Maximum time to retry
* @param interval Time between retry attempts
* @returns Result of successful assertion
*/
def awaitAssert[A](a: => A, max: FiniteDuration, interval: FiniteDuration): A
/**
* Stop the test probe (useful for testing watch/termination)
*/
def stop(): UnitUsage Examples:
import akka.actor.testkit.typed.scaladsl.TestProbe
val probe = testKit.createTestProbe[String]()
// Test actor termination
val actor = testKit.spawn(someBehavior)
actor ! PoisonPill
probe.expectTerminated(actor, 3.seconds)
// Retry assertion for eventually consistent behavior
probe.awaitAssert {
// This assertion will be retried until it succeeds or timeout
val state = getEventualState()
assert(state.isReady)
}, max = 5.seconds, interval = 100.millis
// Stop probe for termination testing
probe.stop()
someWatchingActor ! probe.ref // Will receive Terminated message