Akka TestKit provides various utility classes for configuration management, networking, serialization, and timing that support the core testing functionality. These utilities help create comprehensive test environments and handle common testing scenarios.
object TestKitExtension extends ExtensionId[TestKitSettings] {
def get(system: ActorSystem): TestKitSettings
def get(system: ClassicActorSystemProvider): TestKitSettings
def createExtension(system: ExtendedActorSystem): TestKitSettings
}Extension system for accessing TestKit configuration settings.
class TestKitSettings(config: Config) {
val TestTimeFactor: Double
val SingleExpectDefaultTimeout: FiniteDuration
val TestEventFilterLeeway: FiniteDuration
val DefaultTimeout: Timeout
}Configuration holder containing all TestKit-related settings loaded from configuration files.
import akka.testkit.{TestKitExtension, TestKitSettings}
class ConfigurationTest extends TestKit(ActorSystem("TestSystem")) with ImplicitSender {
val settings: TestKitSettings = TestKitExtension(system)
"TestKit settings" should {
"provide access to configuration values" in {
println(s"Test time factor: ${settings.TestTimeFactor}")
println(s"Default timeout: ${settings.SingleExpectDefaultTimeout}")
println(s"Event filter leeway: ${settings.TestEventFilterLeeway}")
println(s"Default Timeout: ${settings.DefaultTimeout}")
// Use settings in tests
within(settings.SingleExpectDefaultTimeout) {
testActor ! "ping"
expectMsg("pong")
}
}
}
}import com.typesafe.config.ConfigFactory
// Custom test configuration
val testConfig = ConfigFactory.parseString("""
akka {
test {
# Scale factor for test timeouts
timefactor = 2.0
# Default timeout for single message expectations
single-expect-default = 5s
# Leeway for EventFilter operations
filter-leeway = 3s
# Default timeout for operations requiring implicit Timeout
default-timeout = 10s
}
}
""")
val system = ActorSystem("CustomTestSystem", testConfig)
val settings = TestKitExtension(system)
// Settings reflect custom configuration
assert(settings.TestTimeFactor == 2.0)
assert(settings.SingleExpectDefaultTimeout == 5.seconds)class TimeFactorTest extends TestKit(ActorSystem("TestSystem")) with ImplicitSender {
"Time factor" should {
"scale test durations automatically" in {
val settings = TestKitExtension(system)
val baseTimeout = 1.second
// Manual scaling
val scaledTimeout = Duration.fromNanos(
(baseTimeout.toNanos * settings.TestTimeFactor).toLong
)
// Or use TestDuration implicit class
import akka.testkit.TestDuration._
val dilatedTimeout = baseTimeout.dilated
assert(scaledTimeout == dilatedTimeout)
// Use in actual test
within(dilatedTimeout) {
someSlowActor ! "slow-operation"
expectMsg("completed")
}
}
}
}// In akka.testkit package object
implicit class TestDuration(val duration: FiniteDuration) extends AnyVal {
def dilated(implicit system: ActorSystem): FiniteDuration
}Provides automatic time scaling for test durations based on the configured time factor.
import akka.testkit.TestDuration._
import scala.concurrent.duration._
class DurationScalingTest extends TestKit(ActorSystem("TestSystem")) {
"Duration scaling" should {
"automatically adjust timeouts" in {
// Base durations
val shortTimeout = 100.millis
val mediumTimeout = 1.second
val longTimeout = 5.seconds
// Scaled versions (uses TestTimeFactor from config)
val scaledShort = shortTimeout.dilated
val scaledMedium = mediumTimeout.dilated
val scaledLong = longTimeout.dilated
println(s"Short: ${shortTimeout} -> ${scaledShort}")
println(s"Medium: ${mediumTimeout} -> ${scaledMedium}")
println(s"Long: ${longTimeout} -> ${scaledLong}")
// Use in tests
within(scaledMedium) {
slowActor ! "request"
expectMsg(scaledShort, "response")
}
}
}
}class ConditionalScalingTest extends TestKit(ActorSystem("TestSystem")) {
"Conditional scaling" should {
"scale only when needed" in {
val baseTimeout = 2.seconds
val settings = TestKitExtension(system)
// Only scale if time factor is not 1.0
val effectiveTimeout = if (settings.TestTimeFactor != 1.0) {
baseTimeout.dilated
} else {
baseTimeout
}
within(effectiveTimeout) {
testActor ! "message"
expectMsg("response")
}
}
}
}object SocketUtil {
def temporaryLocalPort(udp: Boolean = false): Int
def temporaryLocalPort(protocol: Protocol): Int
def temporaryServerAddress(address: String = "127.0.0.1", udp: Boolean = false): InetSocketAddress
def temporaryServerAddresses(numberOfAddresses: Int, hostname: String = "127.0.0.1", udp: Boolean = false): immutable.IndexedSeq[InetSocketAddress]
sealed trait Protocol
case object Tcp extends Protocol
case object Udp extends Protocol
case object Both extends Protocol
}Utilities for obtaining temporary network ports and addresses for testing network-based actors and services.
import akka.testkit.SocketUtil
import java.net.InetSocketAddress
class NetworkTest extends TestKit(ActorSystem("TestSystem")) {
"SocketUtil" should {
"provide temporary ports for testing" in {
// Get temporary TCP port
val tcpPort = SocketUtil.temporaryLocalPort(udp = false)
println(s"TCP port: $tcpPort")
// Get temporary UDP port
val udpPort = SocketUtil.temporaryLocalPort(udp = true)
println(s"UDP port: $udpPort")
// Get port by protocol
val tcpPort2 = SocketUtil.temporaryLocalPort(SocketUtil.Tcp)
val udpPort2 = SocketUtil.temporaryLocalPort(SocketUtil.Udp)
// All ports should be different
assert(tcpPort != udpPort)
assert(tcpPort != tcpPort2)
assert(udpPort != udpPort2)
}
"provide temporary addresses" in {
// Get temporary server address
val address = SocketUtil.temporaryServerAddress()
println(s"Address: ${address.getHostString}:${address.getPort}")
// Custom hostname
val customAddress = SocketUtil.temporaryServerAddress("localhost", udp = false)
assert(customAddress.getHostString == "localhost")
// Multiple addresses
val addresses = SocketUtil.temporaryServerAddresses(3, "127.0.0.1", udp = false)
assert(addresses.length == 3)
addresses.foreach(addr => println(s"Address: ${addr}"))
}
}
}import akka.actor.{Actor, Props}
import akka.io.{IO, Tcp}
import java.net.InetSocketAddress
class NetworkActor extends Actor {
import Tcp._
import context.system
def receive = {
case "bind" =>
val port = SocketUtil.temporaryLocalPort()
val address = new InetSocketAddress("localhost", port)
IO(Tcp) ! Bind(self, address)
case b @ Bound(localAddress) =>
sender() ! s"bound:${localAddress.getPort}"
case CommandFailed(cmd) =>
sender() ! s"failed:${cmd}"
}
}
class NetworkActorTest extends TestKit(ActorSystem("TestSystem")) with ImplicitSender {
"NetworkActor" should {
"bind to temporary port" in {
val actor = system.actorOf(Props[NetworkActor])
actor ! "bind"
// Expect successful binding with port number
val response = expectMsgPF(5.seconds) {
case msg: String if msg.startsWith("bound:") => msg
}
val port = response.split(":")(1).toInt
assert(port > 0)
println(s"Successfully bound to port: $port")
}
}
}class MultiNodeTest extends TestKit(ActorSystem("TestSystem")) {
"Multi-node setup" should {
"allocate unique ports for each node" in {
val nodeCount = 3
val ports = (1 to nodeCount).map(_ => SocketUtil.temporaryLocalPort())
// All ports should be unique
assert(ports.distinct.length == nodeCount)
// Create addresses for each node
val addresses = ports.map(port => new InetSocketAddress("127.0.0.1", port))
addresses.zipWithIndex.foreach { case (addr, index) =>
println(s"Node $index: ${addr}")
}
// Use addresses to configure cluster nodes or test network communication
// ...
}
}
}class TestMessageSerializer(system: ExtendedActorSystem) extends Serializer {
def identifier: Int
def includeManifest: Boolean
def toBinary(obj: AnyRef): Array[Byte]
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef
}A serializer designed for test messages that uses Java serialization for simplicity.
trait JavaSerializable extends SerializableMarker trait indicating that a message should use Java serialization in test environments.
import akka.serialization.SerializationExtension
// Test message with JavaSerializable marker
case class TestMessage(data: String, count: Int) extends JavaSerializable
class SerializationTest extends TestKit(ActorSystem("TestSystem")) {
"Message serialization" should {
"serialize and deserialize test messages" in {
val serialization = SerializationExtension(system)
val originalMessage = TestMessage("test-data", 42)
// Serialize
val serializer = serialization.findSerializerFor(originalMessage)
val bytes = serializer.toBinary(originalMessage)
println(s"Serialized to ${bytes.length} bytes")
// Deserialize
val deserializedMessage = serializer.fromBinary(bytes, Some(originalMessage.getClass))
// Verify
assert(deserializedMessage == originalMessage)
println(s"Successfully round-tripped: $deserializedMessage")
}
}
}// Configure custom serializer in test configuration
val testConfig = ConfigFactory.parseString("""
akka {
actor {
serializers {
test = "akka.testkit.TestMessageSerializer"
}
serialization-bindings {
"akka.testkit.JavaSerializable" = test
}
}
}
""")
class CustomSerializationTest extends TestKit(ActorSystem("TestSystem", testConfig)) {
"Custom serialization" should {
"use TestMessageSerializer for JavaSerializable messages" in {
val serialization = SerializationExtension(system)
val message = TestMessage("serialized", 123)
val serializer = serialization.findSerializerFor(message)
assert(serializer.isInstanceOf[TestMessageSerializer])
// Test serialization roundtrip
val bytes = serializer.toBinary(message)
val restored = serializer.fromBinary(bytes, Some(message.getClass))
assert(restored == message)
}
}
}import com.typesafe.config.{Config, ConfigFactory}
object TestConfiguration {
def forEnvironment(env: String): Config = {
val baseConfig = ConfigFactory.load("application-test.conf")
val envConfig = ConfigFactory.load(s"application-test-$env.conf")
envConfig.withFallback(baseConfig)
}
def withCustomDispatcher(): Config = {
ConfigFactory.parseString("""
akka {
actor {
default-dispatcher {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
}
}
""")
}
def withTestScheduler(): Config = {
ConfigFactory.parseString("""
akka {
scheduler {
implementation = akka.testkit.ExplicitlyTriggeredScheduler
}
}
""")
}
def deterministicTestConfig(): Config = {
withCustomDispatcher()
.withFallback(withTestScheduler())
.withFallback(ConfigFactory.load("application-test.conf"))
}
}
// Usage in tests
class ConfigurableTest extends TestKit(ActorSystem("TestSystem", TestConfiguration.deterministicTestConfig())) {
"Configurable test" should {
"use deterministic execution" in {
// Test runs with CallingThreadDispatcher and ExplicitlyTriggeredScheduler
val actor = system.actorOf(Props[MyActor])
actor ! "test"
expectMsg("response") // Deterministic execution
}
}
}import scala.util.Random
class DynamicConfigurationTest {
def createTestSystem(customSettings: Map[String, Any] = Map.empty): ActorSystem = {
val baseConfig = Map(
"akka.test.timefactor" -> 1.0,
"akka.test.single-expect-default" -> "3s",
"akka.test.filter-leeway" -> "3s"
)
val finalSettings = baseConfig ++ customSettings
val configString = finalSettings.map {
case (key, value) => s"$key = $value"
}.mkString("\n")
val config = ConfigFactory.parseString(configString)
.withFallback(ConfigFactory.load("application-test.conf"))
ActorSystem(s"TestSystem-${Random.nextInt(1000)}", config)
}
@Test
def testWithCustomTimeout(): Unit = {
val system = createTestSystem(Map(
"akka.test.single-expect-default" -> "10s",
"akka.test.timefactor" -> 2.0
))
new TestKit(system) {
val settings = TestKitExtension(system)
assert(settings.SingleExpectDefaultTimeout == 10.seconds)
assert(settings.TestTimeFactor == 2.0)
TestKit.shutdownActorSystem(system)
}
}
}// Good: Well-documented test configuration
akka {
test {
# Scale timeouts by 2x for slower CI environments
timefactor = 2.0
# Longer default timeout for integration tests
single-expect-default = 5s
# Allow more time for event filter operations
filter-leeway = 3s
}
# Use deterministic dispatcher for unit tests
actor.default-dispatcher.type = akka.testkit.CallingThreadDispatcherConfigurator
# Suppress expected log messages
loggers = ["akka.testkit.TestEventListener"]
loglevel = "WARNING"
log-dead-letters = off
}// Good: Proper network resource management
class NetworkResourceTest extends TestKit(ActorSystem("TestSystem")) with BeforeAndAfterEach {
var allocatedPorts: List[Int] = List.empty
override def beforeEach(): Unit = {
allocatedPorts = List.empty
}
override def afterEach(): Unit = {
// Log allocated ports for debugging
if (allocatedPorts.nonEmpty) {
println(s"Test used ports: ${allocatedPorts.mkString(", ")}")
}
}
def getTestPort(): Int = {
val port = SocketUtil.temporaryLocalPort()
allocatedPorts = port :: allocatedPorts
port
}
}// Good: Comprehensive serialization testing
case class LargeTestMessage(data: Array[Byte]) extends JavaSerializable
class SerializationPerformanceTest extends TestKit(ActorSystem("TestSystem")) {
"Serialization performance" should {
"handle large messages efficiently" in {
val serialization = SerializationExtension(system)
val largeData = Array.fill(1024 * 1024)(0.toByte) // 1MB
val message = LargeTestMessage(largeData)
val startTime = System.nanoTime()
val serializer = serialization.findSerializerFor(message)
val bytes = serializer.toBinary(message)
val deserializedMessage = serializer.fromBinary(bytes, Some(message.getClass))
val endTime = System.nanoTime()
val durationMs = (endTime - startTime) / 1000000
println(s"Serialization roundtrip took ${durationMs}ms for ${bytes.length} bytes")
assert(deserializedMessage.asInstanceOf[LargeTestMessage].data.sameElements(largeData))
assert(durationMs < 1000) // Should complete within 1 second
}
}
}