The MultiNodeSpec class is the main framework for creating and executing coordinated multi-node tests. It extends TestKit and provides barrier synchronization, node-specific execution, and access to the TestConductor extension.
abstract class MultiNodeSpec(config: MultiNodeConfig)
extends TestKit(system) with MultiNodeSpecCallbacks// Primary constructor (most commonly used)
def this(config: MultiNodeConfig)
// Constructor with custom ActorSystem creator
def this(config: MultiNodeConfig, actorSystemCreator: Config => ActorSystem)Usage Example:
object MyTestConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
}
class MyMultiNodeTest extends MultiNodeSpec(MyTestConfig)
with AnyWordSpecLike with Matchers {
def initialParticipants = roles.size
// Test implementation
}def initialParticipants: IntMust be implemented by subclasses to define the number of participants required before starting the test. This is typically set to the number of roles.
Usage Example:
class ClusterTest extends MultiNodeSpec(ClusterConfig) {
import ClusterConfig._
def initialParticipants = roles.size // Most common pattern
// Alternative: specific number
// def initialParticipants = 3
}Requirements:
def (not val or lazy val)def runOn(nodes: RoleName*)(thunk: => Unit): UnitExecutes the given code block only on the specified nodes. Other nodes will skip the block entirely.
Usage Example:
runOn(first) {
// This code only runs on the 'first' node
val cluster = Cluster(system)
cluster.join(cluster.selfAddress)
system.actorOf(Props[ClusterListener](), "listener")
}
runOn(second, third) {
// This code runs on both 'second' and 'third' nodes
val cluster = Cluster(system)
cluster.join(node(first).address)
}Parameters:
nodes: RoleName* - Variable number of role names to execute onthunk: => Unit - Code block to execute (call-by-name)def isNode(nodes: RoleName*): BooleanChecks if the current node matches any of the specified roles.
Usage Example:
if (isNode(first)) {
log.info("I am the first node")
} else if (isNode(second, third)) {
log.info("I am either second or third node")
}
// Using in conditionals
val seedNode = if (isNode(first)) cluster.selfAddress else node(first).addressParameters:
nodes: RoleName* - Role names to check againstReturns: Boolean - True if current node matches any specified role
def enterBarrier(name: String*): UnitEnters the named barriers in the specified order using the default barrier timeout.
Usage Example:
"cluster formation" must {
"start all nodes" in {
// All nodes wait here until everyone reaches this point
enterBarrier("startup")
}
"form cluster" in {
runOn(first) {
Cluster(system).join(Cluster(system).selfAddress)
}
enterBarrier("cluster-started")
runOn(second, third) {
Cluster(system).join(node(first).address)
}
enterBarrier("all-joined")
}
}Parameters:
name: String* - Variable number of barrier names to enter in sequencedef enterBarrier(max: FiniteDuration, name: String*): UnitEnters barriers with a custom timeout, overriding the default barrier timeout.
Usage Example:
// Use longer timeout for slow operations
enterBarrier(60.seconds, "slow-initialization")
// Multiple barriers with custom timeout
enterBarrier(30.seconds, "phase1", "phase2", "phase3")Parameters:
max: FiniteDuration - Maximum time to wait for all nodes to reach barriersname: String* - Barrier names to enter in sequenceNote: The timeout is automatically scaled using Duration.dilated based on the akka.test.timefactor configuration.
def node(role: RoleName): ActorPathReturns the root ActorPath for the specified role, enabling actor selection and messaging between nodes.
Usage Example:
// Get reference to actor on another node
val remoteActor = system.actorSelection(node(first) / "user" / "serviceActor")
// Send message to remote actor
remoteActor ! "Hello from another node"
// Create actor path for deployment
val remotePath = RootActorPath(node(second).address) / "user" / "worker"Parameters:
role: RoleName - Role name to get address forReturns: ActorPath - Root actor path for the specified node
def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): UnitSuppresses dead letter logging for specified message types, useful for tests that intentionally create dead letters.
Usage Example:
// Mute all dead letters
muteDeadLetters()
// Mute specific message types
muteDeadLetters(classOf[String], classOf[MyMessage])
// Mute on specific system
muteDeadLetters(classOf[ClusterEvent.MemberUp])(otherSystem)Parameters:
messageClasses: Class[_]* - Message classes to mute (empty = mute all)sys: ActorSystem - Actor system to apply muting (defaults to current system)protected def startNewSystem(): ActorSystemCreates a new ActorSystem with the same configuration and re-registers with the TestConductor. Used for testing system restarts and recovery scenarios.
Usage Example:
"system recovery" must {
"restart and rejoin cluster" in {
runOn(second) {
// Shutdown current system
system.terminate()
Await.ready(system.whenTerminated, 10.seconds)
// Start new system with same configuration
val newSystem = startNewSystem()
// New system is automatically registered with TestConductor
enterBarrier("system-restarted")
}
}
}Returns: ActorSystem - New actor system with injected deployments and TestConductor
Note: Must be called before entering barriers or using TestConductor after system termination.
MultiNodeSpec extends TestKit, providing access to essential testing utilities:
def expectMsg[T](obj: T): T
def expectMsg[T](d: FiniteDuration, obj: T): T
def expectMsgType[T](implicit t: ClassTag[T]): T
def expectMsgType[T](d: FiniteDuration)(implicit t: ClassTag[T]): T
def expectNoMsg(): Unit
def expectNoMsg(d: FiniteDuration): UnitUsage Example:
runOn(first) {
val probe = TestProbe()
val actor = system.actorOf(Props[MyActor]())
actor.tell("ping", probe.ref)
probe.expectMsg(5.seconds, "pong")
probe.expectNoMsg(1.second)
}def awaitCond(p: => Boolean): Unit
def awaitCond(p: => Boolean, max: Duration): Unit
def awaitCond(p: => Boolean, max: Duration, interval: Duration): UnitRepeatedly evaluates condition until it becomes true or timeout is reached.
Usage Example:
runOn(first, second) {
awaitCond(Cluster(system).state.members.size == 2, 10.seconds)
}def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
def within[T](max: FiniteDuration)(f: => T): TExecutes block and verifies it completes within specified time bounds.
Usage Example:
within(5.seconds, 15.seconds) {
// This block must complete between 5 and 15 seconds
val cluster = Cluster(system)
cluster.join(node(first).address)
awaitCond(cluster.readView.members.size == roles.size)
}MultiNodeSpec implements the MultiNodeSpecCallbacks trait for test framework integration:
trait MultiNodeSpecCallbacks {
def multiNodeSpecBeforeAll(): Unit
def multiNodeSpecAfterAll(): Unit
}def multiNodeSpecBeforeAll(): UnitCalled once before all test cases start. Override for custom initialization.
Usage Example:
class MyTest extends MultiNodeSpec(MyConfig) {
override def multiNodeSpecBeforeAll(): Unit = {
multiNodeSpecBeforeAll() // Call parent implementation
// Custom setup logic
log.info("Starting multi-node test suite")
}
}def multiNodeSpecAfterAll(): UnitCalled once after all test cases complete. Override for custom cleanup.
Usage Example:
class MyTest extends MultiNodeSpec(MyConfig) {
override def multiNodeSpecAfterAll(): Unit = {
// Custom cleanup logic
log.info("Multi-node test suite completed")
multiNodeSpecAfterAll() // Call parent implementation
}
}For ScalaTest integration, use the STMultiNodeSpec trait:
trait STMultiNodeSpec extends MultiNodeSpecCallbacks
with AnyWordSpecLike with Matchers with BeforeAndAfterAll {
override def beforeAll(): Unit = multiNodeSpecBeforeAll()
override def afterAll(): Unit = multiNodeSpecAfterAll()
}Usage Example:
class MyTest extends MultiNodeSpec(MyConfig) with STMultiNodeSpec {
// Test implementation automatically gets proper setup/teardown
"My distributed system" must {
"handle basic operations" in {
// Test code here
}
}
}val myself: RoleName // Current node's role
def roles: immutable.Seq[RoleName] // All registered roles
val log: LoggingAdapter // Logging adaptervar testConductor: TestConductorExt // Access to barriers and failure injectionThe TestConductor is automatically initialized and provides access to advanced coordination features like network failure injection.
Usage Example:
"network partition test" must {
"handle split brain" in {
// Create network partition
testConductor.blackhole(first, second, Direction.Both).await
enterBarrier("partition-created")
// Test behavior during partition
// Restore network
testConductor.passThrough(first, second, Direction.Both).await
enterBarrier("partition-healed")
}
}def shutdownTimeout: FiniteDuration // Timeout for system shutdown (default 15.seconds)
def verifySystemShutdown: Boolean // Whether to verify clean shutdown (default false)protected def atStartup(): UnitOverride to perform initialization when the entire test is starting up.
Usage Example:
class MyTest extends MultiNodeSpec(MyConfig) {
override protected def atStartup(): Unit = {
log.info("Multi-node test starting")
// Custom initialization logic
}
}protected def afterTermination(): UnitOverride to perform cleanup when the entire test is terminating.
Usage Example:
class MyTest extends MultiNodeSpec(MyConfig) {
override protected def afterTermination(): Unit = {
log.info("Multi-node test completed")
// Custom cleanup logic
}
}MultiNodeSpec provides an implicit conversion for enhanced Future handling:
implicit def awaitHelper[T](w: Awaitable[T]): AwaitHelper[T]
class AwaitHelper[T](w: Awaitable[T]) {
def await: T // Uses remaining duration from enclosing within block or QueryTimeout
}Usage Example:
// Instead of using Await.result explicitly
val result = someFuture.await
// Automatically uses remaining time from within block
within(30.seconds) {
val result1 = future1.await // Has up to 30 seconds
val result2 = future2.await // Has remaining time after future1
}object ClusterTestConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka {
actor.provider = cluster
remote.artery.canonical.port = 0
cluster {
seed-nodes = []
auto-down-unreachable-after = 2s
}
}
"""))
}
class ClusterTest extends MultiNodeSpec(ClusterTestConfig)
with AnyWordSpecLike with Matchers {
import ClusterTestConfig._
def initialParticipants = roles.size
"A cluster" must {
"start up" in {
enterBarrier("startup")
}
"form cluster" in {
runOn(first) {
Cluster(system).join(Cluster(system).selfAddress)
}
enterBarrier("first-joined")
runOn(second, third) {
Cluster(system).join(node(first).address)
}
within(15.seconds) {
awaitCond(Cluster(system).state.members.size == 3)
}
enterBarrier("cluster-formed")
}
"handle node removal" in {
runOn(first) {
Cluster(system).leave(node(third).address)
}
runOn(first, second) {
within(10.seconds) {
awaitCond(Cluster(system).state.members.size == 2)
}
}
runOn(third) {
within(10.seconds) {
awaitCond(Cluster(system).isTerminated)
}
}
enterBarrier("node-removed")
}
}
}