Akka TestKit - A comprehensive testing toolkit for Actor-based systems built with the Akka framework
Akka TestKit provides various utility classes, actors, and functions for common testing patterns including pre-built test actors, network utilities, serialization support, and configuration helpers.
Collection of pre-built actor implementations for common testing patterns and scenarios.
object TestActors {
/**
* Props for EchoActor - sends back received messages unmodified
* Useful for testing message routing and actor communication
*/
val echoActorProps: Props
/**
* Props for BlackholeActor - ignores all incoming messages
* Useful for testing scenarios where messages should be discarded
*/
val blackholeProps: Props
/**
* Props for ForwardActor - forwards all messages to specified ActorRef
* @param ref Target ActorRef to forward messages to
* @return Props for ForwardActor configured with target
*/
def forwardActorProps(ref: ActorRef): Props
}
/**
* Actor that echoes back all received messages
* Maintains original sender for proper request-response patterns
*/
class EchoActor extends Actor {
def receive: Receive = {
case message => sender() ! message
}
}
/**
* Actor that ignores all incoming messages
* Useful for dead-end routing and message sink scenarios
*/
class BlackholeActor extends Actor {
def receive: Receive = {
case _ => // Ignore all messages
}
}
/**
* Actor that forwards all messages to a specified target
* Maintains original sender for proper message forwarding
* @param ref Target ActorRef to forward messages to
*/
class ForwardActor(ref: ActorRef) extends Actor {
def receive: Receive = {
case message => ref.forward(message)
}
}Utilities for network testing including port allocation and address management.
object SocketUtil {
/**
* Protocol types for network testing
*/
sealed trait Protocol
case object Tcp extends Protocol
case object Udp extends Protocol
case object Both extends Protocol
/**
* Constant for random loopback address allocation
*/
val RANDOM_LOOPBACK_ADDRESS = "RANDOM_LOOPBACK_ADDRESS"
/**
* Get a temporary free local port
* @param udp Whether to get UDP port (default: TCP)
* @return Available port number
*/
def temporaryLocalPort(udp: Boolean = false): Int
/**
* Get a temporary free local port for specific protocol
* @param protocol Protocol type (Tcp, Udp, or Both)
* @return Available port number that's free for the specified protocol
*/
def temporaryLocalPort(protocol: Protocol): Int
/**
* Get a temporary server address with free port
* @param address Hostname or IP address (use RANDOM_LOOPBACK_ADDRESS for random)
* @param udp Whether to get UDP address (default: TCP)
* @return InetSocketAddress with free port
*/
def temporaryServerAddress(address: String = "localhost", udp: Boolean = false): InetSocketAddress
/**
* Get hostname and port for temporary server
* @param interface Network interface name
* @return Tuple of (hostname, port)
*/
def temporaryServerHostnameAndPort(interface: String = "localhost"): (String, Int)
/**
* Get multiple temporary server addresses
* @param numberOfAddresses Number of addresses to generate
* @param hostname Hostname for all addresses
* @param udp Whether to get UDP addresses (default: TCP)
* @return IndexedSeq of InetSocketAddress instances with free ports
*/
def temporaryServerAddresses(numberOfAddresses: Int, hostname: String = "localhost",
udp: Boolean = false): immutable.IndexedSeq[InetSocketAddress]
/**
* Get server address that is not bound to any service
* @param address Hostname or IP address
* @return InetSocketAddress with available port
*/
def notBoundServerAddress(address: String = "localhost"): InetSocketAddress
/**
* Get server address that is not bound to any service (parameterless version)
* @return InetSocketAddress with localhost and available port
*/
def notBoundServerAddress(): InetSocketAddress
/**
* Get temporary UDP IPv6 port for specific network interface
* @param iface Network interface to bind to
* @return Available IPv6 UDP port number
*/
def temporaryUdpIpv6Port(iface: NetworkInterface): Int
}Predefined exception class for testing without stack trace overhead.
/**
* Predefined exception for testing scenarios
* Extends NoStackTrace to avoid stack trace generation overhead in tests
* @param message Exception message
*/
case class TestException(message: String) extends RuntimeException(message) with NoStackTrace
/**
* Companion object for TestException
*/
object TestException {
/**
* Create TestException with default message
* @return TestException with "test" message
*/
def apply(): TestException = TestException("test")
}Utilities for testing serialization scenarios with Java serialization.
/**
* Java serialization support for ad-hoc test messages
* Useful when testing serialization without defining custom serializers
* @param system Extended actor system for serializer integration
*/
class TestJavaSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
/**
* Serializer identifier for Akka serialization system
*/
def identifier: Int = 42
/**
* Serialize object to byte array using Java serialization
* @param o Object to serialize
* @return Serialized byte array
*/
def toBinary(o: AnyRef): Array[Byte]
/**
* Deserialize byte array to object using Java serialization
* @param bytes Serialized byte array
* @param clazz Optional class hint for deserialization
* @return Deserialized object
*/
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
/**
* Include manifest information for deserialization
*/
def includeManifest: Boolean = false
}
/**
* Marker trait for test messages that should use Java serialization
* Mix into test message classes to enable automatic Java serialization
*/
trait JavaSerializable extends Serializable {
// Marker trait - no additional methods
}TestKit configuration settings and extension utilities.
/**
* TestKit configuration settings
* @param config Configuration object with test settings
*/
class TestKitSettings(config: Config) {
/**
* Time scaling factor for tests - multiplies all timeouts
* Useful for running tests on slower systems
*/
val TestTimeFactor: Double
/**
* Default timeout for single message expectations
*/
val SingleExpectDefaultTimeout: FiniteDuration
/**
* Default timeout for expectNoMessage operations
*/
val ExpectNoMessageDefaultTimeout: FiniteDuration
/**
* Grace period for event filters to complete
*/
val TestEventFilterLeeway: FiniteDuration
/**
* Default Akka timeout for various operations
*/
val DefaultTimeout: Timeout
}
/**
* Extension for accessing TestKit settings
*/
object TestKitExtension extends ExtensionId[TestKitSettings] with ExtensionIdProvider {
/**
* Get TestKit settings for actor system
* @param system ActorSystem to get settings for
* @return TestKitSettings instance
*/
def get(system: ActorSystem): TestKitSettings
def lookup: TestKitExtension.type = TestKitExtension
def createExtension(system: ExtendedActorSystem): TestKitSettings =
new TestKitSettings(system.settings.config)
}Miscellaneous utility functions for TestKit operations.
/**
* Utility functions for TestKit operations
*/
object TestKitUtils {
// Various internal utility functions for TestKit implementation
// Includes timing calculations, message queue management, etc.
}Commonly used traits that provide implicit values for testing convenience.
/**
* Automatically provides testActor as implicit sender
* Mix into test classes to avoid explicitly specifying sender
*/
trait ImplicitSender extends TestKitBase {
/**
* Implicit sender reference - uses testActor
*/
implicit def self: ActorRef = testActor
}
/**
* Provides default timeout from TestKit settings
* Mix into test classes for consistent timeout behavior
*/
trait DefaultTimeout extends TestKitBase {
/**
* Implicit timeout from TestKit configuration
*/
implicit val timeout: Timeout = testKitSettings.DefaultTimeout
}Support for scaling time in test scenarios.
/**
* Implicit class for time dilation in tests
* Allows scaling of durations based on test time factor
* @param duration Original duration to scale
*/
implicit class TestDuration(val duration: FiniteDuration) extends AnyVal {
/**
* Scale duration by test time factor
* @param system Implicit ActorSystem for accessing time factor
* @return Scaled duration based on TestTimeFactor configuration
*/
def dilated(implicit system: ActorSystem): FiniteDuration =
Duration.fromNanos((duration.toNanos * TestKitExtension(system).TestTimeFactor + 0.5).toLong)
}Usage Examples:
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{TestKit, TestActors, SocketUtil, TestException}
import java.net.InetSocketAddress
class TestUtilitiesExample extends TestKit(ActorSystem("test")) {
"TestActors" should {
"provide EchoActor for message echoing" in {
val echo = system.actorOf(TestActors.echoActorProps)
echo ! "hello"
expectMsg("hello")
echo ! 42
expectMsg(42)
echo ! List(1, 2, 3)
expectMsg(List(1, 2, 3))
}
"provide BlackholeActor for message ignoring" in {
val blackhole = system.actorOf(TestActors.blackholeProps)
// Messages are ignored - no response expected
blackhole ! "message1"
blackhole ! "message2"
blackhole ! "message3"
expectNoMessage(100.millis)
}
"provide ForwardActor for message forwarding" in {
val probe = TestProbe()
val forwarder = system.actorOf(TestActors.forwardActorProps(probe.ref))
forwarder ! "forwarded-message"
probe.expectMsg("forwarded-message")
// Original sender is maintained
probe.reply("response")
expectMsg("response")
}
}
"SocketUtil" should {
"provide free ports for network testing" in {
// Get free TCP port
val tcpPort = SocketUtil.temporaryLocalPort()
assert(tcpPort > 0 && tcpPort < 65536)
// Get free UDP port
val udpPort = SocketUtil.temporaryLocalPort(udp = true)
assert(udpPort > 0 && udpPort < 65536)
// Get port free for both protocols
val bothPort = SocketUtil.temporaryLocalPort(SocketUtil.Both)
assert(bothPort > 0 && bothPort < 65536)
}
"provide server addresses with free ports" in {
val address = SocketUtil.temporaryServerAddress("localhost", udp = false)
assert(address.getHostName == "localhost")
assert(address.getPort > 0)
// Multiple addresses
val addresses = SocketUtil.temporaryServerAddresses(3, "127.0.0.1", udp = false)
assert(addresses.length == 3)
addresses.foreach { addr =>
assert(addr.getHostName == "127.0.0.1")
assert(addr.getPort > 0)
}
// All ports should be different
val ports = addresses.map(_.getPort).toSet
assert(ports.size == 3)
}
}
"TestException" should {
"provide exception without stack trace overhead" in {
val exception = TestException("Test error message")
assert(exception.getMessage == "Test error message")
assert(exception.isInstanceOf[NoStackTrace])
// Use in test scenarios
intercept[TestException] {
throw TestException("Expected test failure")
}
}
}
"ImplicitSender trait" should {
"provide automatic sender" in {
// When mixing in ImplicitSender, testActor is used as sender automatically
val echo = system.actorOf(TestActors.echoActorProps)
echo ! "test-message"
expectMsg("test-message")
// No need to specify sender explicitly
}
}
"Time dilation" should {
"scale timeouts based on configuration" in {
import scala.concurrent.duration._
// Original duration
val original = 1.second
// Dilated duration (scaled by TestTimeFactor)
val dilated = original.dilated
// Use dilated timeouts in tests
within(dilated) {
// Test operations that should complete within scaled time
Thread.sleep(500) // This would work even if TestTimeFactor > 1
}
}
}
}Complex testing scenarios using multiple utilities together.
class AdvancedUtilitiesExample extends TestKit(ActorSystem("test")) {
class NetworkActor(host: String, port: Int) extends Actor {
def receive = {
case "connect" =>
// Simulate network connection
sender() ! s"connected-to-$host:$port"
case "disconnect" =>
sender() ! "disconnected"
case msg =>
sender() ! s"sent-$msg-to-$host:$port"
}
}
"Combined utilities" should {
"support complex network testing scenarios" in {
// Get free port for test server
val serverPort = SocketUtil.temporaryLocalPort()
val serverAddress = SocketUtil.temporaryServerAddress("localhost", udp = false)
// Create network actor with test port
val networkActor = system.actorOf(Props(new NetworkActor("localhost", serverPort)))
// Create echo actor for response testing
val echo = system.actorOf(TestActors.echoActorProps)
// Test network connection
networkActor ! "connect"
expectMsg(s"connected-to-localhost:$serverPort")
// Test message forwarding through network
val forwarder = system.actorOf(TestActors.forwardActorProps(echo))
forwarder ! "network-message"
expectMsg("network-message")
}
"support error testing with custom exceptions" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case "fail" => throw TestException("Simulated failure")
case msg => sender() ! s"processed-$msg"
}
}))
// Normal operation
actor ! "normal"
expectMsg("processed-normal")
// Error scenario with TestException
EventFilter[TestException](message = "Simulated failure").intercept {
actor ! "fail"
}
}
}
}Examples of using TestKit configuration and extensions.
class ConfigurationUsageExample extends TestKit(ActorSystem("test")) {
"TestKit configuration" should {
"provide access to test settings" in {
val settings = TestKitExtension(system)
println(s"Test time factor: ${settings.TestTimeFactor}")
println(s"Default timeout: ${settings.DefaultTimeout}")
println(s"Single expect timeout: ${settings.SingleExpectDefaultTimeout}")
// Use settings in test logic
val scaledTimeout = (1.second.toNanos * settings.TestTimeFactor).nanos
within(scaledTimeout) {
// Perform time-sensitive test operations
}
}
"support custom configuration" in {
import com.typesafe.config.ConfigFactory
val customConfig = ConfigFactory.parseString("""
akka.test {
timefactor = 2.0
single-expect-default = 10s
default-timeout = 15s
}
""")
val customSystem = ActorSystem("custom-test", customConfig)
val customSettings = TestKitExtension(customSystem)
assert(customSettings.TestTimeFactor == 2.0)
assert(customSettings.SingleExpectDefaultTimeout == 10.seconds)
TestKit.shutdownActorSystem(customSystem)
}
}
}Guidelines for effective use of TestKit utilities.
// GOOD: Use appropriate test actors for different scenarios
val echo = system.actorOf(TestActors.echoActorProps) // For response testing
val blackhole = system.actorOf(TestActors.blackholeProps) // For fire-and-forget testing
val forwarder = system.actorOf(TestActors.forwardActorProps(probe.ref)) // For routing testing
// GOOD: Get free ports for network tests to avoid conflicts
val port = SocketUtil.temporaryLocalPort()
val server = startTestServer(port)
// GOOD: Use TestException for expected test failures
def simulateError(): Unit = throw TestException("Expected test error")
// GOOD: Scale timeouts appropriately
val timeout = 5.seconds.dilated // Scales with TestTimeFactor
// GOOD: Mix in helpful traits
class MyTest extends TestKit(ActorSystem("test")) with ImplicitSender with DefaultTimeout
// AVOID: Hardcoded ports that might conflict
// val server = startTestServer(8080) // BAD - might be in use
// AVOID: Using real exceptions for expected test failures
// throw new RuntimeException("test") // BAD - generates unnecessary stack tracesInstall with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-testkit-2-12