or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

actor-references.mdcore-testing.mddeterministic-execution.mdevent-filtering.mdindex.mdjava-api.mdtest-utilities.mdutilities-config.md
tile.json

tessl/maven-com-typesafe-akka--akka-testkit_2-11

Comprehensive testing toolkit for Akka actor-based systems with synchronous testing utilities, event filtering, and deterministic execution control

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-testkit_2.11@2.5.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-testkit_2-11@2.5.0

index.mddocs/

Akka TestKit

Akka TestKit is a comprehensive testing toolkit designed specifically for testing actor-based concurrent and distributed systems built with the Akka framework. It provides essential testing utilities including synchronous message sending and receiving, controllable test actors, event filtering, and deterministic execution control for reliable testing of complex actor interactions and system behaviors.

Package Information

  • Package Name: akka-testkit
  • Package Type: maven
  • Language: Scala (with Java API)
  • Installation: libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % "2.5.32" % Test

Core Imports

import akka.testkit._
import akka.actor.ActorSystem
import akka.actor.Props

For implicit conversions and utilities:

import akka.testkit.TestDuration._
import scala.concurrent.duration._

Java API imports:

import akka.testkit.javadsl.TestKit
import akka.testkit.javadsl.EventFilter

Basic Usage

import akka.testkit.{TestKit, ImplicitSender}
import akka.actor.{ActorSystem, Actor, Props}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

class MyActorSpec extends TestKit(ActorSystem("TestSystem"))
  with ImplicitSender
  with WordSpecLike
  with Matchers
  with BeforeAndAfterAll {

  override def afterAll: Unit = {
    TestKit.shutdownActorSystem(system)
  }

  "MyActor" must {
    "respond to ping with pong" in {
      val actor = system.actorOf(Props[MyActor])
      actor ! "ping"
      expectMsg("pong")
    }
  }
}

Architecture

Akka TestKit provides several layers of testing infrastructure:

  • TestKit Framework: Core testing classes with assertion methods and timing control
  • Specialized Actor References: TestActorRef and TestFSMRef for synchronous testing access
  • Test Utilities: Probes, barriers, latches, and common test actor patterns
  • Deterministic Execution: Custom dispatchers and schedulers for predictable testing
  • Event System: Comprehensive logging event filtering and interception
  • Java Compatibility: Full Java API with modern Java 8+ features

Core Testing Framework

The foundational testing infrastructure provides synchronous message handling, timing control, and comprehensive assertion methods for actor system testing.

TestKit and TestKitBase { .api }

class TestKit(_system: ActorSystem) extends TestKitBase

trait TestKitBase {
  implicit val system: ActorSystem
  val testKitSettings: TestKitSettings
  val testActor: ActorRef
  def lastSender: ActorRef
  
  // Message expectations
  def expectMsg[T](obj: T): T
  def expectMsg[T](max: FiniteDuration, obj: T): T
  def expectMsg[T](max: FiniteDuration, hint: String, obj: T): T
  def expectMsgType[T](implicit t: ClassTag[T]): T
  def expectMsgType[T](max: FiniteDuration)(implicit t: ClassTag[T]): T
  def expectMsgClass[C](c: Class[C]): C
  def expectMsgClass[C](max: FiniteDuration, c: Class[C]): C
  def expectMsgAnyOf[T](objs: T*): T
  def expectMsgAnyOf[T](max: FiniteDuration, objs: T*): T
  def expectMsgAllOf[T](objs: T*): immutable.Seq[T]
  def expectMsgAllOf[T](max: FiniteDuration, objs: T*): immutable.Seq[T]
  def expectMsgPF[T](max: Duration = Duration.Undefined, hint: String = "")(f: PartialFunction[Any, T]): T
  def expectTerminated(target: ActorRef, max: Duration = Duration.Undefined): Terminated
  
  // No message expectations
  @deprecated("Use expectNoMessage instead", "2.5.5")
  def expectNoMsg(): Unit
  @deprecated("Use expectNoMessage instead", "2.5.5")
  def expectNoMsg(max: FiniteDuration): Unit
  def expectNoMessage(): Unit
  def expectNoMessage(max: FiniteDuration): Unit
  
  // Message reception
  def receiveOne(max: Duration): AnyRef
  def receiveN(n: Int): immutable.Seq[AnyRef]
  def receiveN(n: Int, max: FiniteDuration): immutable.Seq[AnyRef]
  def receiveWhile[T](max: Duration = Duration.Undefined, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): immutable.Seq[T]
  def fishForMessage(max: Duration = Duration.Undefined, hint: String = "")(fisher: PartialFunction[Any, Boolean]): Any
  def fishForSpecificMessage[T](max: Duration = Duration.Undefined, hint: String = "")(f: PartialFunction[Any, T]): T
  
  // Timing control
  def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
  def within[T](max: FiniteDuration)(f: => T): T
  def awaitCond(p: => Boolean, max: Duration = Duration.Undefined, interval: Duration = 100.millis, message: String = ""): Unit
  def awaitAssert[A](a: => A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A
  def remaining: FiniteDuration
  def remainingOr(duration: FiniteDuration): FiniteDuration
  def remainingOrDefault: FiniteDuration
  
  // Actor lifecycle
  def watch(ref: ActorRef): ActorRef
  def unwatch(ref: ActorRef): ActorRef
  def childActorOf(props: Props): ActorRef
  def childActorOf(props: Props, name: String): ActorRef
  def childActorOf(props: Props, supervisorStrategy: SupervisorStrategy): ActorRef
  def childActorOf(props: Props, name: String, supervisorStrategy: SupervisorStrategy): ActorRef
  
  // Message control
  def ignoreMsg(f: PartialFunction[Any, Boolean]): Unit
  def ignoreNoMsg(): Unit
  def setAutoPilot(pilot: TestActor.AutoPilot): Unit
}

TestProbe { .api }

class TestProbe(_application: ActorSystem, name: String) extends TestKitBase {
  def ref: ActorRef
  def send(actor: ActorRef, msg: Any): Unit
  def forward(actor: ActorRef, msg: Any): Unit
  def sender(): ActorRef
  def reply(msg: Any): Unit
}

object TestProbe {
  def apply()(implicit system: ActorSystem): TestProbe
  def apply(name: String)(implicit system: ActorSystem): TestProbe
}

Core Testing Framework

Specialized Actor References

Specialized actor references that provide synchronous access to actor internals and state for detailed testing scenarios.

TestActorRef { .api }

class TestActorRef[T <: Actor](_system: ActorSystem, _props: Props, _supervisor: ActorRef, name: String) extends InternalActorRef {
  def receive(o: Any): Unit
  def receive(o: Any, sender: ActorRef): Unit
  def underlyingActor: T
  def watch(subject: ActorRef): ActorRef
  def unwatch(subject: ActorRef): ActorRef
}

object TestActorRef {
  def apply[T <: Actor](props: Props)(implicit system: ActorSystem): TestActorRef[T]
  def apply[T <: Actor](props: Props, name: String)(implicit system: ActorSystem): TestActorRef[T]
  def apply[T <: Actor](props: Props, supervisor: ActorRef)(implicit system: ActorSystem): TestActorRef[T]
  def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T]
}

TestFSMRef { .api }

class TestFSMRef[S, D, T <: Actor](system: ActorSystem, props: Props, supervisor: ActorRef, name: String) extends TestActorRef[T] {
  def stateName: S
  def stateData: D
  def setState(stateName: S, stateData: D, timeout: FiniteDuration, stopReason: Option[FSM.Reason]): Unit
  def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean): Unit
  def cancelTimer(name: String): Unit
  def isTimerActive(name: String): Boolean
  def isStateTimerActive: Boolean
}

object TestFSMRef {
  def apply[S, D, T <: Actor: ClassTag](factory: => T)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
  def apply[S, D, T <: Actor: ClassTag](factory: => T, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
  def apply[S, D, T <: Actor: ClassTag](factory: => T, supervisor: ActorRef)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
  def apply[S, D, T <: Actor: ClassTag](factory: => T, supervisor: ActorRef, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
}

Specialized Actor References

Test Utilities and Coordination

Helper classes and utilities for test coordination, synchronization, and common testing patterns.

TestActors { .api }

object TestActors {
  val echoActorProps: Props
  val blackholeProps: Props
  def forwardActorProps(ref: ActorRef): Props
}

TestBarrier { .api }

class TestBarrier(count: Int) {
  def await()(implicit system: ActorSystem): Unit
  def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
  def reset(): Unit
}

object TestBarrier {
  val DefaultTimeout: Duration
  def apply(count: Int): TestBarrier
}

TestLatch { .api }

class TestLatch(count: Int)(implicit system: ActorSystem) extends Awaitable[Unit] {
  def countDown(): Unit
  def isOpen: Boolean
  def open(): Unit
  def reset(): Unit
  def ready(atMost: Duration)(implicit permit: CanAwait): TestLatch
  def result(atMost: Duration)(implicit permit: CanAwait): Unit
}

object TestLatch {
  val DefaultTimeout: Duration
  def apply(count: Int)(implicit system: ActorSystem): TestLatch
}

Test Utilities and Coordination

Deterministic Execution Control

Components for controlling timing and execution order in tests to ensure deterministic, reproducible test runs.

CallingThreadDispatcher { .api }

class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) extends MessageDispatcher

object CallingThreadDispatcher {
  val Id: String = "akka.test.calling-thread-dispatcher"
}

ExplicitlyTriggeredScheduler { .api }

class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends Scheduler {
  def timePasses(amount: FiniteDuration): Unit
  def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
  def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
  def maxFrequency: Double
}

Deterministic Execution Control

Event Filtering and Testing

Comprehensive system for intercepting, filtering, and testing logging events and system messages during test execution.

EventFilter { .api }

abstract class EventFilter(occurrences: Int) {
  def awaitDone(max: Duration): Boolean
  def assertDone(max: Duration): Unit
  def intercept[T](code: => T)(implicit system: ActorSystem): T
}

object EventFilter {
  // Exception-based filters
  def apply[A <: Throwable: ClassTag](
    message: String = null, 
    source: String = null, 
    start: String = "", 
    pattern: String = null, 
    occurrences: Int = Int.MaxValue): EventFilter
  
  // Message-based filters with three matching modes
  def error(
    message: String = null, 
    source: String = null, 
    start: String = "", 
    pattern: String = null, 
    occurrences: Int = Int.MaxValue): EventFilter
  def warning(
    message: String = null, 
    source: String = null, 
    start: String = "", 
    pattern: String = null, 
    occurrences: Int = Int.MaxValue): EventFilter
  def info(
    message: String = null, 
    source: String = null, 
    start: String = "", 
    pattern: String = null, 
    occurrences: Int = Int.MaxValue): EventFilter
  def debug(
    message: String = null, 
    source: String = null, 
    start: String = "", 
    pattern: String = null, 
    occurrences: Int = Int.MaxValue): EventFilter
  
  // Custom filter
  def custom(test: PartialFunction[LogEvent, Boolean], occurrences: Int = Int.MaxValue): EventFilter
}

Package-level filtering functions { .api }

// In akka.testkit package object
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: => T)(implicit system: ActorSystem): T
def filterEvents[T](eventFilters: EventFilter*)(block: => T)(implicit system: ActorSystem): T
def filterException[T <: Throwable](block: => Unit)(implicit system: ActorSystem, t: ClassTag[T]): Unit

Event Filtering and Testing

Java API

Modern Java 8+ compatible API providing the same functionality as the Scala API with Java-friendly method signatures and types.

javadsl.TestKit { .api }

class TestKit(system: ActorSystem) {
  // Java Duration-based methods
  def expectMsg(duration: java.time.Duration, obj: AnyRef): AnyRef
  def expectMsgClass(duration: java.time.Duration, clazz: Class[_]): AnyRef
  def expectNoMessage(duration: java.time.Duration): Unit
  def within(duration: java.time.Duration, supplier: Supplier[_]): AnyRef
}

javadsl.EventFilter { .api }

class EventFilter(clazz: Class[_], system: ActorSystem) {
  def message(pattern: String): EventFilter
  def source(source: String): EventFilter
  def occurrences(count: Int): EventFilter
  def intercept[T](supplier: Supplier[T]): T
}

Java API

Utility Classes and Configuration

Supporting classes for configuration, serialization, networking, and timing utilities.

TestKitExtension and Settings { .api }

object TestKitExtension extends ExtensionId[TestKitSettings] {
  def get(system: ActorSystem): TestKitSettings
  def get(system: ClassicActorSystemProvider): TestKitSettings
}

class TestKitSettings(config: Config) {
  val TestTimeFactor: Double
  val SingleExpectDefaultTimeout: FiniteDuration
  val TestEventFilterLeeway: FiniteDuration
  val DefaultTimeout: Timeout
}

SocketUtil { .api }

object SocketUtil {
  def temporaryLocalPort(udp: Boolean = false): Int
  def temporaryLocalPort(protocol: Protocol): Int
  def temporaryServerAddress(address: String = "127.0.0.1", udp: Boolean = false): InetSocketAddress
  def temporaryServerAddresses(numberOfAddresses: Int, hostname: String, udp: Boolean): immutable.IndexedSeq[InetSocketAddress]
  
  sealed trait Protocol
  case object Tcp extends Protocol
  case object Udp extends Protocol
  case object Both extends Protocol
}

TestDuration (implicit class) { .api }

// In akka.testkit package object
implicit class TestDuration(val duration: FiniteDuration) extends AnyVal {
  def dilated(implicit system: ActorSystem): FiniteDuration
}

Utility Classes and Configuration