Multi-node testing toolkit for coordinated testing of distributed Akka applications across multiple JVMs
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-multi-node-testkit-2-12@2.8.0The Akka Multi-Node TestKit provides a comprehensive framework for testing distributed Akka applications across multiple JVMs and potentially multiple machines. It enables coordinated testing with barrier synchronization, network failure injection, and distributed system testing capabilities, making it invaluable for testing distributed systems built with Akka.
com.typesafe.akka:akka-multi-node-testkit_2.12:2.8.8"com.typesafe.akka" %% "akka-multi-node-testkit" % "2.8.8" % Testimport akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec, Direction}
import akka.remote.testconductor.{TestConductor, RoleName}import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec}
import akka.remote.testconductor.RoleName
import com.typesafe.config.ConfigFactory
// Define test configuration
object SampleMultiNodeConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.actor.provider = remote
akka.remote.artery.canonical.port = 0
"""))
}
// Create multi-node test
class SampleMultiNodeSpec extends MultiNodeSpec(SampleMultiNodeConfig)
with AnyWordSpecLike with Matchers {
import SampleMultiNodeConfig._
def initialParticipants = roles.size
"A multi-node cluster" must {
"wait for all nodes to start" in {
enterBarrier("startup")
}
"allow nodes to communicate" in {
runOn(first) {
// Code that runs only on first node
val actor = system.actorOf(Props[SampleActor](), "sample")
enterBarrier("actor-created")
}
runOn(second, third) {
enterBarrier("actor-created")
val actorSelection = system.actorSelection(node(first) / "user" / "sample")
// Test communication with first node's actor
}
enterBarrier("test-complete")
}
}
}The Akka Multi-Node TestKit is built around several key components:
Core configuration system for defining multi-node test participants, roles, and Akka settings. Essential for setting up distributed test scenarios.
abstract class MultiNodeConfig {
def commonConfig(config: Config): Unit
def nodeConfig(roles: RoleName*)(configs: Config*): Unit
def role(name: String): RoleName
def debugConfig(on: Boolean): Config
def deployOn(role: RoleName, deployment: String): Unit
def deployOnAll(deployment: String): Unit
def testTransport(on: Boolean): Unit
}Main test framework for creating and executing coordinated multi-node tests with barrier synchronization and node-specific execution.
abstract class MultiNodeSpec(
myself: RoleName,
system: ActorSystem,
roles: immutable.Seq[RoleName],
deployments: RoleName => Seq[String]
) extends TestKit(system) {
def initialParticipants: Int
def runOn(nodes: RoleName*)(thunk: => Unit): Unit
def isNode(nodes: RoleName*): Boolean
def enterBarrier(name: String*): Unit
def enterBarrier(max: FiniteDuration, name: String*): Unit
def node(role: RoleName): ActorPath
}Extension providing advanced coordination capabilities including barrier management and network failure injection for testing distributed system resilience.
class TestConductorExt(system: ExtendedActorSystem)
extends Extension with Conductor with Player {
// Conductor capabilities
def startController(participants: Int, name: RoleName, controllerPort: InetSocketAddress): Future[InetSocketAddress]
def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done]
def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done]
def disconnect(node: RoleName, target: RoleName): Future[Done]
def abort(node: RoleName, target: RoleName): Future[Done]
def shutdown(node: RoleName, abort: Boolean): Future[Done]
def removeNode(node: RoleName): Future[Done]
def getNodes: Future[Iterable[RoleName]]
// Player capabilities
def startClient(name: RoleName, controllerAddr: InetSocketAddress): Future[Done]
def enter(timeout: Timeout, name: immutable.Seq[String]): Unit
def getAddressFor(name: RoleName): Future[Address]
}System property based configuration for multi-node test execution including node identification, networking, and coordinator settings.
object MultiNodeSpec {
val maxNodes: Int
val selfName: String
val tcpPort: Int
val udpPort: Option[Int]
val selfPort: Int
val serverName: String
val serverPort: Int
val selfIndex: Int
def configureNextPortIfFixed(config: Config): Config
}System Properties Configuration
// Role identification
case class RoleName(name: String)
// Network traffic direction for failure injection
sealed trait Direction {
def includes(other: Direction): Boolean
}
object Direction {
case object Send extends Direction {
def includes(other: Direction): Boolean = other == Send
def getInstance: Direction = this // Java API
}
case object Receive extends Direction {
def includes(other: Direction): Boolean = other == Receive
def getInstance: Direction = this // Java API
}
case object Both extends Direction {
def includes(other: Direction): Boolean = true
def getInstance: Direction = this // Java API
}
// Java API factory methods
def sendDirection(): Direction = Send
def receiveDirection(): Direction = Receive
def bothDirection(): Direction = Both
}
// Completion marker for operations
sealed trait Done
case object Done extends Done
// Helper for awaiting futures with remaining duration
class AwaitHelper[T](w: Awaitable[T]) {
def await: T
}Multi-node tests require specific system properties:
-Dmultinode.max-nodes=3 # Number of participating nodes
-Dmultinode.host=localhost # Hostname of current node
-Dmultinode.port=0 # Port for current node (0 = automatic)
-Dmultinode.server-host=localhost # Hostname of conductor node
-Dmultinode.server-port=4711 # Port of conductor node
-Dmultinode.index=0 # Index of current node (0-based)akka {
actor.provider = remote
remote.artery.canonical {
hostname = ${multinode.host}
port = ${multinode.port}
}
testconductor {
barrier-timeout = 30s
query-timeout = 5s
}
}To use blackhole, passThrough, and throttle features:
object MyMultiNodeConfig extends MultiNodeConfig {
// Enable test transport for failure injection
testTransport(on = true)
}trait STMultiNodeSpec extends MultiNodeSpecCallbacks
with AnyWordSpecLike with Matchers with BeforeAndAfterAll {
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
}
class MyTest extends MultiNodeSpec(MyConfig) with STMultiNodeSpec {
// Test implementation
}class MyTest extends MultiNodeSpec(MyConfig) with PerfFlamesSupport {
"performance test" in {
runPerfFlames(first, second)(5.seconds)
// Test code that will be profiled after 5 second delay
}
}