or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdmulti-node-spec.mdsystem-properties.mdtest-conductor.mdtest-configuration.md
tile.json

multi-node-spec.mddocs/

Multi-Node Test Specification

The MultiNodeSpec class is the main framework for creating and executing coordinated multi-node tests. It extends TestKit and provides barrier synchronization, node-specific execution, and access to the TestConductor extension.

MultiNodeSpec Class

abstract class MultiNodeSpec(config: MultiNodeConfig) 
  extends TestKit(system) with MultiNodeSpecCallbacks

Constructors

// Primary constructor (most commonly used)
def this(config: MultiNodeConfig)

// Constructor with custom ActorSystem creator
def this(config: MultiNodeConfig, actorSystemCreator: Config => ActorSystem)

Usage Example:

object MyTestConfig extends MultiNodeConfig {
  val first = role("first")
  val second = role("second")
}

class MyMultiNodeTest extends MultiNodeSpec(MyTestConfig) 
  with AnyWordSpecLike with Matchers {
  
  def initialParticipants = roles.size
  
  // Test implementation
}

Required Implementation

Initial Participants

def initialParticipants: Int

Must be implemented by subclasses to define the number of participants required before starting the test. This is typically set to the number of roles.

Usage Example:

class ClusterTest extends MultiNodeSpec(ClusterConfig) {
  import ClusterConfig._
  
  def initialParticipants = roles.size  // Most common pattern
  
  // Alternative: specific number
  // def initialParticipants = 3
}

Requirements:

  • Must be a def (not val or lazy val)
  • Must return a value greater than 0
  • Must not exceed the total number of available nodes

Node Execution Control

Run On Specific Nodes

def runOn(nodes: RoleName*)(thunk: => Unit): Unit

Executes the given code block only on the specified nodes. Other nodes will skip the block entirely.

Usage Example:

runOn(first) {
  // This code only runs on the 'first' node
  val cluster = Cluster(system)
  cluster.join(cluster.selfAddress)
  
  system.actorOf(Props[ClusterListener](), "listener")
}

runOn(second, third) {
  // This code runs on both 'second' and 'third' nodes
  val cluster = Cluster(system)  
  cluster.join(node(first).address)
}

Parameters:

  • nodes: RoleName* - Variable number of role names to execute on
  • thunk: => Unit - Code block to execute (call-by-name)

Node Identity Check

def isNode(nodes: RoleName*): Boolean

Checks if the current node matches any of the specified roles.

Usage Example:

if (isNode(first)) {
  log.info("I am the first node")
} else if (isNode(second, third)) {
  log.info("I am either second or third node")
}

// Using in conditionals
val seedNode = if (isNode(first)) cluster.selfAddress else node(first).address

Parameters:

  • nodes: RoleName* - Role names to check against

Returns: Boolean - True if current node matches any specified role

Barrier Synchronization

Basic Barrier Entry

def enterBarrier(name: String*): Unit

Enters the named barriers in the specified order using the default barrier timeout.

Usage Example:

"cluster formation" must {
  "start all nodes" in {
    // All nodes wait here until everyone reaches this point
    enterBarrier("startup")
  }
  
  "form cluster" in {
    runOn(first) {
      Cluster(system).join(Cluster(system).selfAddress)
    }
    enterBarrier("cluster-started")
    
    runOn(second, third) {
      Cluster(system).join(node(first).address)  
    }
    enterBarrier("all-joined")
  }
}

Parameters:

  • name: String* - Variable number of barrier names to enter in sequence

Barrier Entry with Timeout

def enterBarrier(max: FiniteDuration, name: String*): Unit

Enters barriers with a custom timeout, overriding the default barrier timeout.

Usage Example:

// Use longer timeout for slow operations
enterBarrier(60.seconds, "slow-initialization")

// Multiple barriers with custom timeout
enterBarrier(30.seconds, "phase1", "phase2", "phase3")

Parameters:

  • max: FiniteDuration - Maximum time to wait for all nodes to reach barriers
  • name: String* - Barrier names to enter in sequence

Note: The timeout is automatically scaled using Duration.dilated based on the akka.test.timefactor configuration.

Node Communication

Node Address Resolution

def node(role: RoleName): ActorPath

Returns the root ActorPath for the specified role, enabling actor selection and messaging between nodes.

Usage Example:

// Get reference to actor on another node
val remoteActor = system.actorSelection(node(first) / "user" / "serviceActor")

// Send message to remote actor
remoteActor ! "Hello from another node"

// Create actor path for deployment
val remotePath = RootActorPath(node(second).address) / "user" / "worker"

Parameters:

  • role: RoleName - Role name to get address for

Returns: ActorPath - Root actor path for the specified node

Utility Methods

Dead Letter Suppression

def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): Unit

Suppresses dead letter logging for specified message types, useful for tests that intentionally create dead letters.

Usage Example:

// Mute all dead letters
muteDeadLetters()

// Mute specific message types
muteDeadLetters(classOf[String], classOf[MyMessage])

// Mute on specific system
muteDeadLetters(classOf[ClusterEvent.MemberUp])(otherSystem)

Parameters:

  • messageClasses: Class[_]* - Message classes to mute (empty = mute all)
  • sys: ActorSystem - Actor system to apply muting (defaults to current system)

System Restart

protected def startNewSystem(): ActorSystem

Creates a new ActorSystem with the same configuration and re-registers with the TestConductor. Used for testing system restarts and recovery scenarios.

Usage Example:

"system recovery" must {
  "restart and rejoin cluster" in {
    runOn(second) {
      // Shutdown current system
      system.terminate()
      Await.ready(system.whenTerminated, 10.seconds)
      
      // Start new system with same configuration
      val newSystem = startNewSystem()
      // New system is automatically registered with TestConductor
      
      enterBarrier("system-restarted")
    }
  }
}

Returns: ActorSystem - New actor system with injected deployments and TestConductor

Note: Must be called before entering barriers or using TestConductor after system termination.

TestKit Inherited Methods

MultiNodeSpec extends TestKit, providing access to essential testing utilities:

Message Expectations

def expectMsg[T](obj: T): T
def expectMsg[T](d: FiniteDuration, obj: T): T
def expectMsgType[T](implicit t: ClassTag[T]): T  
def expectMsgType[T](d: FiniteDuration)(implicit t: ClassTag[T]): T
def expectNoMsg(): Unit
def expectNoMsg(d: FiniteDuration): Unit

Usage Example:

runOn(first) {
  val probe = TestProbe()
  val actor = system.actorOf(Props[MyActor]())
  
  actor.tell("ping", probe.ref)
  probe.expectMsg(5.seconds, "pong")
  probe.expectNoMsg(1.second)
}

Conditional Waiting

def awaitCond(p: => Boolean): Unit
def awaitCond(p: => Boolean, max: Duration): Unit
def awaitCond(p: => Boolean, max: Duration, interval: Duration): Unit

Repeatedly evaluates condition until it becomes true or timeout is reached.

Usage Example:

runOn(first, second) {
  awaitCond(Cluster(system).state.members.size == 2, 10.seconds)
}

Time-Bounded Execution

def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
def within[T](max: FiniteDuration)(f: => T): T

Executes block and verifies it completes within specified time bounds.

Usage Example:

within(5.seconds, 15.seconds) {
  // This block must complete between 5 and 15 seconds
  val cluster = Cluster(system)
  cluster.join(node(first).address)
  awaitCond(cluster.readView.members.size == roles.size)
}

MultiNodeSpecCallbacks Interface

MultiNodeSpec implements the MultiNodeSpecCallbacks trait for test framework integration:

trait MultiNodeSpecCallbacks {
  def multiNodeSpecBeforeAll(): Unit
  def multiNodeSpecAfterAll(): Unit
}

Before All Hook

def multiNodeSpecBeforeAll(): Unit

Called once before all test cases start. Override for custom initialization.

Usage Example:

class MyTest extends MultiNodeSpec(MyConfig) {
  override def multiNodeSpecBeforeAll(): Unit = {
    multiNodeSpecBeforeAll()  // Call parent implementation
    // Custom setup logic
    log.info("Starting multi-node test suite")
  }
}

After All Hook

def multiNodeSpecAfterAll(): Unit

Called once after all test cases complete. Override for custom cleanup.

Usage Example:

class MyTest extends MultiNodeSpec(MyConfig) {
  override def multiNodeSpecAfterAll(): Unit = {
    // Custom cleanup logic
    log.info("Multi-node test suite completed")
    multiNodeSpecAfterAll()  // Call parent implementation
  }
}

ScalaTest Integration

For ScalaTest integration, use the STMultiNodeSpec trait:

trait STMultiNodeSpec extends MultiNodeSpecCallbacks 
  with AnyWordSpecLike with Matchers with BeforeAndAfterAll {
  
  override def beforeAll(): Unit = multiNodeSpecBeforeAll()
  override def afterAll(): Unit = multiNodeSpecAfterAll()
}

Usage Example:

class MyTest extends MultiNodeSpec(MyConfig) with STMultiNodeSpec {
  // Test implementation automatically gets proper setup/teardown
  
  "My distributed system" must {
    "handle basic operations" in {
      // Test code here
    }
  }
}

Properties and State

Role Information

val myself: RoleName                           // Current node's role
def roles: immutable.Seq[RoleName]            // All registered roles
val log: LoggingAdapter                       // Logging adapter

TestConductor Access

var testConductor: TestConductorExt           // Access to barriers and failure injection

The TestConductor is automatically initialized and provides access to advanced coordination features like network failure injection.

Usage Example:

"network partition test" must {
  "handle split brain" in {
    // Create network partition
    testConductor.blackhole(first, second, Direction.Both).await
    
    enterBarrier("partition-created")
    
    // Test behavior during partition
    
    // Restore network
    testConductor.passThrough(first, second, Direction.Both).await
    
    enterBarrier("partition-healed")
  }
}

Configuration Access

def shutdownTimeout: FiniteDuration           // Timeout for system shutdown (default 15.seconds)
def verifySystemShutdown: Boolean             // Whether to verify clean shutdown (default false)

Lifecycle Hooks

Startup Hook

protected def atStartup(): Unit

Override to perform initialization when the entire test is starting up.

Usage Example:

class MyTest extends MultiNodeSpec(MyConfig) {
  override protected def atStartup(): Unit = {
    log.info("Multi-node test starting")
    // Custom initialization logic
  }
}

Termination Hook

protected def afterTermination(): Unit

Override to perform cleanup when the entire test is terminating.

Usage Example:

class MyTest extends MultiNodeSpec(MyConfig) {
  override protected def afterTermination(): Unit = {
    log.info("Multi-node test completed")
    // Custom cleanup logic
  }
}

Awaitable Helper

MultiNodeSpec provides an implicit conversion for enhanced Future handling:

implicit def awaitHelper[T](w: Awaitable[T]): AwaitHelper[T]

class AwaitHelper[T](w: Awaitable[T]) {
  def await: T  // Uses remaining duration from enclosing within block or QueryTimeout
}

Usage Example:

// Instead of using Await.result explicitly
val result = someFuture.await

// Automatically uses remaining time from within block
within(30.seconds) {
  val result1 = future1.await  // Has up to 30 seconds
  val result2 = future2.await  // Has remaining time after future1
}

Complete Test Example

object ClusterTestConfig extends MultiNodeConfig {
  val first = role("first")
  val second = role("second") 
  val third = role("third")
  
  commonConfig(ConfigFactory.parseString("""
    akka {
      actor.provider = cluster
      remote.artery.canonical.port = 0
      cluster {
        seed-nodes = []
        auto-down-unreachable-after = 2s
      }
    }
  """))
}

class ClusterTest extends MultiNodeSpec(ClusterTestConfig)
  with AnyWordSpecLike with Matchers {
  
  import ClusterTestConfig._
  
  def initialParticipants = roles.size
  
  "A cluster" must {
    "start up" in {
      enterBarrier("startup")
    }
    
    "form cluster" in {
      runOn(first) {
        Cluster(system).join(Cluster(system).selfAddress)
      }
      enterBarrier("first-joined")
      
      runOn(second, third) {
        Cluster(system).join(node(first).address)
      }
      
      within(15.seconds) {
        awaitCond(Cluster(system).state.members.size == 3)
      }
      
      enterBarrier("cluster-formed")
    }
    
    "handle node removal" in {
      runOn(first) {
        Cluster(system).leave(node(third).address)
      }
      
      runOn(first, second) {
        within(10.seconds) {
          awaitCond(Cluster(system).state.members.size == 2)
        }
      }
      
      runOn(third) {
        within(10.seconds) {
          awaitCond(Cluster(system).isTerminated)
        }
      }
      
      enterBarrier("node-removed")
    }
  }
}