or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

actor-references.mdcore-testing.mddeterministic-execution.mdevent-filtering.mdindex.mdjava-api.mdtest-utilities.mdutilities-config.md
tile.json

utilities-config.mddocs/

Utility Classes and Configuration

Akka TestKit provides various utility classes for configuration management, networking, serialization, and timing that support the core testing functionality. These utilities help create comprehensive test environments and handle common testing scenarios.

Configuration System

TestKitExtension { .api }

object TestKitExtension extends ExtensionId[TestKitSettings] {
  def get(system: ActorSystem): TestKitSettings
  def get(system: ClassicActorSystemProvider): TestKitSettings
  def createExtension(system: ExtendedActorSystem): TestKitSettings
}

Extension system for accessing TestKit configuration settings.

TestKitSettings { .api }

class TestKitSettings(config: Config) {
  val TestTimeFactor: Double
  val SingleExpectDefaultTimeout: FiniteDuration
  val TestEventFilterLeeway: FiniteDuration
  val DefaultTimeout: Timeout
}

Configuration holder containing all TestKit-related settings loaded from configuration files.

Usage Examples

Accessing TestKit Settings

import akka.testkit.{TestKitExtension, TestKitSettings}

class ConfigurationTest extends TestKit(ActorSystem("TestSystem")) with ImplicitSender {
  
  val settings: TestKitSettings = TestKitExtension(system)
  
  "TestKit settings" should {
    "provide access to configuration values" in {
      println(s"Test time factor: ${settings.TestTimeFactor}")
      println(s"Default timeout: ${settings.SingleExpectDefaultTimeout}")
      println(s"Event filter leeway: ${settings.TestEventFilterLeeway}")
      println(s"Default Timeout: ${settings.DefaultTimeout}")
      
      // Use settings in tests
      within(settings.SingleExpectDefaultTimeout) {
        testActor ! "ping"
        expectMsg("pong")
      }
    }
  }
}

Custom Configuration

import com.typesafe.config.ConfigFactory

// Custom test configuration
val testConfig = ConfigFactory.parseString("""
  akka {
    test {
      # Scale factor for test timeouts
      timefactor = 2.0
      
      # Default timeout for single message expectations
      single-expect-default = 5s
      
      # Leeway for EventFilter operations  
      filter-leeway = 3s
      
      # Default timeout for operations requiring implicit Timeout
      default-timeout = 10s
    }
  }
""")

val system = ActorSystem("CustomTestSystem", testConfig)
val settings = TestKitExtension(system)

// Settings reflect custom configuration
assert(settings.TestTimeFactor == 2.0)
assert(settings.SingleExpectDefaultTimeout == 5.seconds)

Time Factor Usage

class TimeFactorTest extends TestKit(ActorSystem("TestSystem")) with ImplicitSender {
  
  "Time factor" should {
    "scale test durations automatically" in {
      val settings = TestKitExtension(system)
      val baseTimeout = 1.second
      
      // Manual scaling
      val scaledTimeout = Duration.fromNanos(
        (baseTimeout.toNanos * settings.TestTimeFactor).toLong
      )
      
      // Or use TestDuration implicit class
      import akka.testkit.TestDuration._
      val dilatedTimeout = baseTimeout.dilated
      
      assert(scaledTimeout == dilatedTimeout)
      
      // Use in actual test
      within(dilatedTimeout) {
        someSlowActor ! "slow-operation"
        expectMsg("completed")
      }
    }
  }
}

Timing Utilities

TestDuration Implicit Class { .api }

// In akka.testkit package object
implicit class TestDuration(val duration: FiniteDuration) extends AnyVal {
  def dilated(implicit system: ActorSystem): FiniteDuration
}

Provides automatic time scaling for test durations based on the configured time factor.

Usage Examples

Duration Scaling

import akka.testkit.TestDuration._
import scala.concurrent.duration._

class DurationScalingTest extends TestKit(ActorSystem("TestSystem")) {
  
  "Duration scaling" should {
    "automatically adjust timeouts" in {
      // Base durations
      val shortTimeout = 100.millis
      val mediumTimeout = 1.second
      val longTimeout = 5.seconds
      
      // Scaled versions (uses TestTimeFactor from config)
      val scaledShort = shortTimeout.dilated
      val scaledMedium = mediumTimeout.dilated  
      val scaledLong = longTimeout.dilated
      
      println(s"Short: ${shortTimeout} -> ${scaledShort}")
      println(s"Medium: ${mediumTimeout} -> ${scaledMedium}")
      println(s"Long: ${longTimeout} -> ${scaledLong}")
      
      // Use in tests
      within(scaledMedium) {
        slowActor ! "request"
        expectMsg(scaledShort, "response")
      }
    }
  }
}

Conditional Scaling

class ConditionalScalingTest extends TestKit(ActorSystem("TestSystem")) {
  
  "Conditional scaling" should {
    "scale only when needed" in {
      val baseTimeout = 2.seconds
      val settings = TestKitExtension(system)
      
      // Only scale if time factor is not 1.0
      val effectiveTimeout = if (settings.TestTimeFactor != 1.0) {
        baseTimeout.dilated
      } else {
        baseTimeout
      }
      
      within(effectiveTimeout) {
        testActor ! "message"
        expectMsg("response")
      }
    }
  }
}

Network Utilities

SocketUtil { .api }

object SocketUtil {
  def temporaryLocalPort(udp: Boolean = false): Int
  def temporaryLocalPort(protocol: Protocol): Int
  def temporaryServerAddress(address: String = "127.0.0.1", udp: Boolean = false): InetSocketAddress
  def temporaryServerAddresses(numberOfAddresses: Int, hostname: String = "127.0.0.1", udp: Boolean = false): immutable.IndexedSeq[InetSocketAddress]
  
  sealed trait Protocol
  case object Tcp extends Protocol
  case object Udp extends Protocol  
  case object Both extends Protocol
}

Utilities for obtaining temporary network ports and addresses for testing network-based actors and services.

Usage Examples

Basic Port Allocation

import akka.testkit.SocketUtil
import java.net.InetSocketAddress

class NetworkTest extends TestKit(ActorSystem("TestSystem")) {
  
  "SocketUtil" should {
    "provide temporary ports for testing" in {
      // Get temporary TCP port
      val tcpPort = SocketUtil.temporaryLocalPort(udp = false)
      println(s"TCP port: $tcpPort")
      
      // Get temporary UDP port
      val udpPort = SocketUtil.temporaryLocalPort(udp = true)
      println(s"UDP port: $udpPort")
      
      // Get port by protocol
      val tcpPort2 = SocketUtil.temporaryLocalPort(SocketUtil.Tcp)
      val udpPort2 = SocketUtil.temporaryLocalPort(SocketUtil.Udp)
      
      // All ports should be different
      assert(tcpPort != udpPort)
      assert(tcpPort != tcpPort2)
      assert(udpPort != udpPort2)
    }
    
    "provide temporary addresses" in {
      // Get temporary server address
      val address = SocketUtil.temporaryServerAddress()
      println(s"Address: ${address.getHostString}:${address.getPort}")
      
      // Custom hostname
      val customAddress = SocketUtil.temporaryServerAddress("localhost", udp = false)
      assert(customAddress.getHostString == "localhost")
      
      // Multiple addresses
      val addresses = SocketUtil.temporaryServerAddresses(3, "127.0.0.1", udp = false)
      assert(addresses.length == 3)
      addresses.foreach(addr => println(s"Address: ${addr}"))
    }
  }
}

Testing Network Actors

import akka.actor.{Actor, Props}
import akka.io.{IO, Tcp}
import java.net.InetSocketAddress

class NetworkActor extends Actor {
  import Tcp._
  import context.system
  
  def receive = {
    case "bind" =>
      val port = SocketUtil.temporaryLocalPort()
      val address = new InetSocketAddress("localhost", port)
      IO(Tcp) ! Bind(self, address)
      
    case b @ Bound(localAddress) =>
      sender() ! s"bound:${localAddress.getPort}"
      
    case CommandFailed(cmd) =>
      sender() ! s"failed:${cmd}"
  }
}

class NetworkActorTest extends TestKit(ActorSystem("TestSystem")) with ImplicitSender {
  
  "NetworkActor" should {
    "bind to temporary port" in {
      val actor = system.actorOf(Props[NetworkActor])
      
      actor ! "bind"
      
      // Expect successful binding with port number
      val response = expectMsgPF(5.seconds) {
        case msg: String if msg.startsWith("bound:") => msg
      }
      
      val port = response.split(":")(1).toInt
      assert(port > 0)
      println(s"Successfully bound to port: $port")
    }
  }
}

Multi-Node Testing Setup

class MultiNodeTest extends TestKit(ActorSystem("TestSystem")) {
  
  "Multi-node setup" should {
    "allocate unique ports for each node" in {
      val nodeCount = 3
      val ports = (1 to nodeCount).map(_ => SocketUtil.temporaryLocalPort())
      
      // All ports should be unique
      assert(ports.distinct.length == nodeCount)
      
      // Create addresses for each node
      val addresses = ports.map(port => new InetSocketAddress("127.0.0.1", port))
      
      addresses.zipWithIndex.foreach { case (addr, index) =>
        println(s"Node $index: ${addr}")
      }
      
      // Use addresses to configure cluster nodes or test network communication
      // ...
    }
  }
}

Serialization Utilities

TestMessageSerializer { .api }

class TestMessageSerializer(system: ExtendedActorSystem) extends Serializer {
  def identifier: Int
  def includeManifest: Boolean
  def toBinary(obj: AnyRef): Array[Byte]
  def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef
}

A serializer designed for test messages that uses Java serialization for simplicity.

JavaSerializable { .api }

trait JavaSerializable extends Serializable

Marker trait indicating that a message should use Java serialization in test environments.

Usage Examples

Testing Message Serialization

import akka.serialization.SerializationExtension

// Test message with JavaSerializable marker
case class TestMessage(data: String, count: Int) extends JavaSerializable

class SerializationTest extends TestKit(ActorSystem("TestSystem")) {
  
  "Message serialization" should {
    "serialize and deserialize test messages" in {
      val serialization = SerializationExtension(system)
      val originalMessage = TestMessage("test-data", 42)
      
      // Serialize
      val serializer = serialization.findSerializerFor(originalMessage)
      val bytes = serializer.toBinary(originalMessage)
      println(s"Serialized to ${bytes.length} bytes")
      
      // Deserialize
      val deserializedMessage = serializer.fromBinary(bytes, Some(originalMessage.getClass))
      
      // Verify
      assert(deserializedMessage == originalMessage)
      println(s"Successfully round-tripped: $deserializedMessage")
    }
  }
}

Custom Test Serializer Configuration

// Configure custom serializer in test configuration
val testConfig = ConfigFactory.parseString("""
  akka {
    actor {
      serializers {
        test = "akka.testkit.TestMessageSerializer"
      }
      serialization-bindings {
        "akka.testkit.JavaSerializable" = test
      }
    }
  }
""")

class CustomSerializationTest extends TestKit(ActorSystem("TestSystem", testConfig)) {
  
  "Custom serialization" should {
    "use TestMessageSerializer for JavaSerializable messages" in {
      val serialization = SerializationExtension(system)
      val message = TestMessage("serialized", 123)
      
      val serializer = serialization.findSerializerFor(message)
      assert(serializer.isInstanceOf[TestMessageSerializer])
      
      // Test serialization roundtrip
      val bytes = serializer.toBinary(message)
      val restored = serializer.fromBinary(bytes, Some(message.getClass))
      assert(restored == message)
    }
  }
}

Configuration Management

Test Configuration Patterns

Environment-Specific Configuration

import com.typesafe.config.{Config, ConfigFactory}

object TestConfiguration {
  
  def forEnvironment(env: String): Config = {
    val baseConfig = ConfigFactory.load("application-test.conf")
    val envConfig = ConfigFactory.load(s"application-test-$env.conf")
    envConfig.withFallback(baseConfig)
  }
  
  def withCustomDispatcher(): Config = {
    ConfigFactory.parseString("""
      akka {
        actor {
          default-dispatcher {
            type = akka.testkit.CallingThreadDispatcherConfigurator
          }
        }
      }
    """)
  }
  
  def withTestScheduler(): Config = {
    ConfigFactory.parseString("""
      akka {
        scheduler {
          implementation = akka.testkit.ExplicitlyTriggeredScheduler
        }
      }
    """)
  }
  
  def deterministicTestConfig(): Config = {
    withCustomDispatcher()
      .withFallback(withTestScheduler())
      .withFallback(ConfigFactory.load("application-test.conf"))
  }
}

// Usage in tests
class ConfigurableTest extends TestKit(ActorSystem("TestSystem", TestConfiguration.deterministicTestConfig())) {
  
  "Configurable test" should {
    "use deterministic execution" in {
      // Test runs with CallingThreadDispatcher and ExplicitlyTriggeredScheduler
      val actor = system.actorOf(Props[MyActor])
      actor ! "test"
      expectMsg("response")  // Deterministic execution
    }
  }
}

Dynamic Configuration

import scala.util.Random

class DynamicConfigurationTest {
  
  def createTestSystem(customSettings: Map[String, Any] = Map.empty): ActorSystem = {
    val baseConfig = Map(
      "akka.test.timefactor" -> 1.0,
      "akka.test.single-expect-default" -> "3s",
      "akka.test.filter-leeway" -> "3s"
    )
    
    val finalSettings = baseConfig ++ customSettings
    val configString = finalSettings.map { 
      case (key, value) => s"$key = $value"
    }.mkString("\n")
    
    val config = ConfigFactory.parseString(configString)
      .withFallback(ConfigFactory.load("application-test.conf"))
      
    ActorSystem(s"TestSystem-${Random.nextInt(1000)}", config)
  }
  
  @Test
  def testWithCustomTimeout(): Unit = {
    val system = createTestSystem(Map(
      "akka.test.single-expect-default" -> "10s",
      "akka.test.timefactor" -> 2.0
    ))
    
    new TestKit(system) {
      val settings = TestKitExtension(system)
      assert(settings.SingleExpectDefaultTimeout == 10.seconds)
      assert(settings.TestTimeFactor == 2.0)
      
      TestKit.shutdownActorSystem(system)
    }
  }
}

Best Practices

Configuration Best Practices

  1. Separate test configuration: Use dedicated test configuration files
  2. Environment-specific settings: Create configurations for different test environments
  3. Use time factors: Scale timeouts appropriately for different environments
  4. Document settings: Comment configuration values and their purposes
// Good: Well-documented test configuration
akka {
  test {
    # Scale timeouts by 2x for slower CI environments
    timefactor = 2.0
    
    # Longer default timeout for integration tests
    single-expect-default = 5s
    
    # Allow more time for event filter operations
    filter-leeway = 3s
  }
  
  # Use deterministic dispatcher for unit tests
  actor.default-dispatcher.type = akka.testkit.CallingThreadDispatcherConfigurator
  
  # Suppress expected log messages
  loggers = ["akka.testkit.TestEventListener"]
  loglevel = "WARNING"
  log-dead-letters = off
}

Network Testing Best Practices

  1. Use temporary ports: Always use SocketUtil for port allocation
  2. Clean up resources: Ensure network resources are released after tests
  3. Test port conflicts: Verify that multiple test runs don't conflict
  4. Handle binding failures: Account for port binding failures in tests
// Good: Proper network resource management
class NetworkResourceTest extends TestKit(ActorSystem("TestSystem")) with BeforeAndAfterEach {
  
  var allocatedPorts: List[Int] = List.empty
  
  override def beforeEach(): Unit = {
    allocatedPorts = List.empty
  }
  
  override def afterEach(): Unit = {
    // Log allocated ports for debugging
    if (allocatedPorts.nonEmpty) {
      println(s"Test used ports: ${allocatedPorts.mkString(", ")}")
    }
  }
  
  def getTestPort(): Int = {
    val port = SocketUtil.temporaryLocalPort()
    allocatedPorts = port :: allocatedPorts
    port
  }
}

Serialization Testing Best Practices

  1. Test serialization boundaries: Verify messages crossing serialization boundaries
  2. Use JavaSerializable for tests: Simplify test message serialization
  3. Test large messages: Verify behavior with large serialized messages
  4. Measure serialization performance: Monitor serialization overhead in tests
// Good: Comprehensive serialization testing
case class LargeTestMessage(data: Array[Byte]) extends JavaSerializable

class SerializationPerformanceTest extends TestKit(ActorSystem("TestSystem")) {
  
  "Serialization performance" should {
    "handle large messages efficiently" in {
      val serialization = SerializationExtension(system)
      val largeData = Array.fill(1024 * 1024)(0.toByte)  // 1MB
      val message = LargeTestMessage(largeData)
      
      val startTime = System.nanoTime()
      val serializer = serialization.findSerializerFor(message)
      val bytes = serializer.toBinary(message)
      val deserializedMessage = serializer.fromBinary(bytes, Some(message.getClass))
      val endTime = System.nanoTime()
      
      val durationMs = (endTime - startTime) / 1000000
      println(s"Serialization roundtrip took ${durationMs}ms for ${bytes.length} bytes")
      
      assert(deserializedMessage.asInstanceOf[LargeTestMessage].data.sameElements(largeData))
      assert(durationMs < 1000)  // Should complete within 1 second
    }
  }
}