Comprehensive testing toolkit for Akka actor-based systems with synchronous testing utilities, event filtering, and deterministic execution control
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-testkit_2-11@2.5.0Akka TestKit is a comprehensive testing toolkit designed specifically for testing actor-based concurrent and distributed systems built with the Akka framework. It provides essential testing utilities including synchronous message sending and receiving, controllable test actors, event filtering, and deterministic execution control for reliable testing of complex actor interactions and system behaviors.
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % "2.5.32" % Testimport akka.testkit._
import akka.actor.ActorSystem
import akka.actor.PropsFor implicit conversions and utilities:
import akka.testkit.TestDuration._
import scala.concurrent.duration._Java API imports:
import akka.testkit.javadsl.TestKit
import akka.testkit.javadsl.EventFilterimport akka.testkit.{TestKit, ImplicitSender}
import akka.actor.{ActorSystem, Actor, Props}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
class MyActorSpec extends TestKit(ActorSystem("TestSystem"))
with ImplicitSender
with WordSpecLike
with Matchers
with BeforeAndAfterAll {
override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
}
"MyActor" must {
"respond to ping with pong" in {
val actor = system.actorOf(Props[MyActor])
actor ! "ping"
expectMsg("pong")
}
}
}Akka TestKit provides several layers of testing infrastructure:
The foundational testing infrastructure provides synchronous message handling, timing control, and comprehensive assertion methods for actor system testing.
class TestKit(_system: ActorSystem) extends TestKitBase
trait TestKitBase {
implicit val system: ActorSystem
val testKitSettings: TestKitSettings
val testActor: ActorRef
def lastSender: ActorRef
// Message expectations
def expectMsg[T](obj: T): T
def expectMsg[T](max: FiniteDuration, obj: T): T
def expectMsg[T](max: FiniteDuration, hint: String, obj: T): T
def expectMsgType[T](implicit t: ClassTag[T]): T
def expectMsgType[T](max: FiniteDuration)(implicit t: ClassTag[T]): T
def expectMsgClass[C](c: Class[C]): C
def expectMsgClass[C](max: FiniteDuration, c: Class[C]): C
def expectMsgAnyOf[T](objs: T*): T
def expectMsgAnyOf[T](max: FiniteDuration, objs: T*): T
def expectMsgAllOf[T](objs: T*): immutable.Seq[T]
def expectMsgAllOf[T](max: FiniteDuration, objs: T*): immutable.Seq[T]
def expectMsgPF[T](max: Duration = Duration.Undefined, hint: String = "")(f: PartialFunction[Any, T]): T
def expectTerminated(target: ActorRef, max: Duration = Duration.Undefined): Terminated
// No message expectations
@deprecated("Use expectNoMessage instead", "2.5.5")
def expectNoMsg(): Unit
@deprecated("Use expectNoMessage instead", "2.5.5")
def expectNoMsg(max: FiniteDuration): Unit
def expectNoMessage(): Unit
def expectNoMessage(max: FiniteDuration): Unit
// Message reception
def receiveOne(max: Duration): AnyRef
def receiveN(n: Int): immutable.Seq[AnyRef]
def receiveN(n: Int, max: FiniteDuration): immutable.Seq[AnyRef]
def receiveWhile[T](max: Duration = Duration.Undefined, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): immutable.Seq[T]
def fishForMessage(max: Duration = Duration.Undefined, hint: String = "")(fisher: PartialFunction[Any, Boolean]): Any
def fishForSpecificMessage[T](max: Duration = Duration.Undefined, hint: String = "")(f: PartialFunction[Any, T]): T
// Timing control
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
def within[T](max: FiniteDuration)(f: => T): T
def awaitCond(p: => Boolean, max: Duration = Duration.Undefined, interval: Duration = 100.millis, message: String = ""): Unit
def awaitAssert[A](a: => A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A
def remaining: FiniteDuration
def remainingOr(duration: FiniteDuration): FiniteDuration
def remainingOrDefault: FiniteDuration
// Actor lifecycle
def watch(ref: ActorRef): ActorRef
def unwatch(ref: ActorRef): ActorRef
def childActorOf(props: Props): ActorRef
def childActorOf(props: Props, name: String): ActorRef
def childActorOf(props: Props, supervisorStrategy: SupervisorStrategy): ActorRef
def childActorOf(props: Props, name: String, supervisorStrategy: SupervisorStrategy): ActorRef
// Message control
def ignoreMsg(f: PartialFunction[Any, Boolean]): Unit
def ignoreNoMsg(): Unit
def setAutoPilot(pilot: TestActor.AutoPilot): Unit
}class TestProbe(_application: ActorSystem, name: String) extends TestKitBase {
def ref: ActorRef
def send(actor: ActorRef, msg: Any): Unit
def forward(actor: ActorRef, msg: Any): Unit
def sender(): ActorRef
def reply(msg: Any): Unit
}
object TestProbe {
def apply()(implicit system: ActorSystem): TestProbe
def apply(name: String)(implicit system: ActorSystem): TestProbe
}Specialized actor references that provide synchronous access to actor internals and state for detailed testing scenarios.
class TestActorRef[T <: Actor](_system: ActorSystem, _props: Props, _supervisor: ActorRef, name: String) extends InternalActorRef {
def receive(o: Any): Unit
def receive(o: Any, sender: ActorRef): Unit
def underlyingActor: T
def watch(subject: ActorRef): ActorRef
def unwatch(subject: ActorRef): ActorRef
}
object TestActorRef {
def apply[T <: Actor](props: Props)(implicit system: ActorSystem): TestActorRef[T]
def apply[T <: Actor](props: Props, name: String)(implicit system: ActorSystem): TestActorRef[T]
def apply[T <: Actor](props: Props, supervisor: ActorRef)(implicit system: ActorSystem): TestActorRef[T]
def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T]
}class TestFSMRef[S, D, T <: Actor](system: ActorSystem, props: Props, supervisor: ActorRef, name: String) extends TestActorRef[T] {
def stateName: S
def stateData: D
def setState(stateName: S, stateData: D, timeout: FiniteDuration, stopReason: Option[FSM.Reason]): Unit
def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean): Unit
def cancelTimer(name: String): Unit
def isTimerActive(name: String): Boolean
def isStateTimerActive: Boolean
}
object TestFSMRef {
def apply[S, D, T <: Actor: ClassTag](factory: => T)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
def apply[S, D, T <: Actor: ClassTag](factory: => T, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
def apply[S, D, T <: Actor: ClassTag](factory: => T, supervisor: ActorRef)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
def apply[S, D, T <: Actor: ClassTag](factory: => T, supervisor: ActorRef, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
}Helper classes and utilities for test coordination, synchronization, and common testing patterns.
object TestActors {
val echoActorProps: Props
val blackholeProps: Props
def forwardActorProps(ref: ActorRef): Props
}class TestBarrier(count: Int) {
def await()(implicit system: ActorSystem): Unit
def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
def reset(): Unit
}
object TestBarrier {
val DefaultTimeout: Duration
def apply(count: Int): TestBarrier
}class TestLatch(count: Int)(implicit system: ActorSystem) extends Awaitable[Unit] {
def countDown(): Unit
def isOpen: Boolean
def open(): Unit
def reset(): Unit
def ready(atMost: Duration)(implicit permit: CanAwait): TestLatch
def result(atMost: Duration)(implicit permit: CanAwait): Unit
}
object TestLatch {
val DefaultTimeout: Duration
def apply(count: Int)(implicit system: ActorSystem): TestLatch
}Test Utilities and Coordination
Components for controlling timing and execution order in tests to ensure deterministic, reproducible test runs.
class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) extends MessageDispatcher
object CallingThreadDispatcher {
val Id: String = "akka.test.calling-thread-dispatcher"
}class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends Scheduler {
def timePasses(amount: FiniteDuration): Unit
def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
def maxFrequency: Double
}Deterministic Execution Control
Comprehensive system for intercepting, filtering, and testing logging events and system messages during test execution.
abstract class EventFilter(occurrences: Int) {
def awaitDone(max: Duration): Boolean
def assertDone(max: Duration): Unit
def intercept[T](code: => T)(implicit system: ActorSystem): T
}
object EventFilter {
// Exception-based filters
def apply[A <: Throwable: ClassTag](
message: String = null,
source: String = null,
start: String = "",
pattern: String = null,
occurrences: Int = Int.MaxValue): EventFilter
// Message-based filters with three matching modes
def error(
message: String = null,
source: String = null,
start: String = "",
pattern: String = null,
occurrences: Int = Int.MaxValue): EventFilter
def warning(
message: String = null,
source: String = null,
start: String = "",
pattern: String = null,
occurrences: Int = Int.MaxValue): EventFilter
def info(
message: String = null,
source: String = null,
start: String = "",
pattern: String = null,
occurrences: Int = Int.MaxValue): EventFilter
def debug(
message: String = null,
source: String = null,
start: String = "",
pattern: String = null,
occurrences: Int = Int.MaxValue): EventFilter
// Custom filter
def custom(test: PartialFunction[LogEvent, Boolean], occurrences: Int = Int.MaxValue): EventFilter
}// In akka.testkit package object
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: => T)(implicit system: ActorSystem): T
def filterEvents[T](eventFilters: EventFilter*)(block: => T)(implicit system: ActorSystem): T
def filterException[T <: Throwable](block: => Unit)(implicit system: ActorSystem, t: ClassTag[T]): UnitModern Java 8+ compatible API providing the same functionality as the Scala API with Java-friendly method signatures and types.
class TestKit(system: ActorSystem) {
// Java Duration-based methods
def expectMsg(duration: java.time.Duration, obj: AnyRef): AnyRef
def expectMsgClass(duration: java.time.Duration, clazz: Class[_]): AnyRef
def expectNoMessage(duration: java.time.Duration): Unit
def within(duration: java.time.Duration, supplier: Supplier[_]): AnyRef
}class EventFilter(clazz: Class[_], system: ActorSystem) {
def message(pattern: String): EventFilter
def source(source: String): EventFilter
def occurrences(count: Int): EventFilter
def intercept[T](supplier: Supplier[T]): T
}Supporting classes for configuration, serialization, networking, and timing utilities.
object TestKitExtension extends ExtensionId[TestKitSettings] {
def get(system: ActorSystem): TestKitSettings
def get(system: ClassicActorSystemProvider): TestKitSettings
}
class TestKitSettings(config: Config) {
val TestTimeFactor: Double
val SingleExpectDefaultTimeout: FiniteDuration
val TestEventFilterLeeway: FiniteDuration
val DefaultTimeout: Timeout
}object SocketUtil {
def temporaryLocalPort(udp: Boolean = false): Int
def temporaryLocalPort(protocol: Protocol): Int
def temporaryServerAddress(address: String = "127.0.0.1", udp: Boolean = false): InetSocketAddress
def temporaryServerAddresses(numberOfAddresses: Int, hostname: String, udp: Boolean): immutable.IndexedSeq[InetSocketAddress]
sealed trait Protocol
case object Tcp extends Protocol
case object Udp extends Protocol
case object Both extends Protocol
}// In akka.testkit package object
implicit class TestDuration(val duration: FiniteDuration) extends AnyVal {
def dilated(implicit system: ActorSystem): FiniteDuration
}