or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdmulti-node-spec.mdsystem-properties.mdtest-conductor.mdtest-configuration.md
tile.json

test-conductor.mddocs/

Test Conductor

The TestConductor extension provides advanced coordination capabilities for multi-node tests, including barrier management, network failure injection, and node lifecycle control. It combines both Conductor (controller) and Player (participant) roles in a single extension.

TestConductor Extension

object TestConductor extends ExtensionId[TestConductorExt] {
  def apply()(implicit ctx: ActorContext): TestConductorExt
  def get(system: ActorSystem): TestConductorExt
  def get(system: ClassicActorSystemProvider): TestConductorExt
}

class TestConductorExt(system: ExtendedActorSystem)
  extends Extension with Conductor with Player

Extension Access

Usage Example:

// Get extension instance
val testConductor = TestConductor(system)

// Or within actor context
class MyActor extends Actor {
  val testConductor = TestConductor(context.system)
}

Extension Factory Methods

object TestConductor extends ExtensionId[TestConductorExt] {
  def createExtension(system: ExtendedActorSystem): TestConductorExt
  def lookup(): ExtensionId[_ <: Extension]
  def apply(system: ActorSystem): TestConductorExt
  def apply()(implicit context: ActorContext): TestConductorExt
}

The TestConductor extension is automatically loaded and configured when accessed through the ExtensionId interface.

Conductor Capabilities

The Conductor trait provides orchestration features for controlling the test environment and coordinating between nodes.

Controller Management

Start Controller

def startController(
  participants: Int,
  name: RoleName, 
  controllerPort: InetSocketAddress
): Future[InetSocketAddress]

def startController(
  participants: Int,
  name: RoleName
): Future[InetSocketAddress]

Starts the TestConductor controller which binds to a TCP port and coordinates all test participants. Also automatically starts the client connection.

Usage Example:

val controllerPort = new InetSocketAddress("localhost", 4711)
val actualPort = testConductor.startController(
  participants = 3,
  name = myself,
  controllerPort = controllerPort
).await

log.info(s"Controller started on $actualPort")

Parameters:

  • participants: Int - Number of participants that must connect before tests begin
  • name: RoleName - Role name of the controller node
  • controllerPort: InetSocketAddress - Address to bind controller (port 0 = automatic)

Returns: Future[InetSocketAddress] - Actual bound address of the controller

Get Controller Address

def sockAddr: Future[InetSocketAddress]

Returns the actual bound address of the controller socket.

Usage Example:

val address = testConductor.sockAddr.await
println(s"Controller listening on ${address.getHostString}:${address.getPort}")

Network Failure Injection

Network failure injection requires enabling test transport in MultiNodeConfig:

testTransport(on = true)  // Must be set in MultiNodeConfig

Blackhole Traffic

def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done]

Switches the network pipeline to drop all messages between the specified nodes in the given direction.

Usage Example:

// Block all traffic between node1 and node2
testConductor.blackhole(node1, node2, Direction.Both).await

// Block only messages sent from node1 to node2  
testConductor.blackhole(node1, node2, Direction.Send).await

// Block only messages received by node1 from node2
testConductor.blackhole(node1, node2, Direction.Receive).await

Parameters:

  • node: RoleName - Node to apply the blackhole on
  • target: RoleName - Target node for the connection
  • direction: Direction - Traffic direction (Send, Receive, or Both)

Pass Through Traffic

def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done]

Restores normal message passing between nodes, removing any traffic restrictions.

Usage Example:

// Restore normal traffic after blackhole
testConductor.passThrough(node1, node2, Direction.Both).await

// Test network partition and recovery
testConductor.blackhole(node1, node2, Direction.Both).await
enterBarrier("partition-created")

// Test behavior during partition...

testConductor.passThrough(node1, node2, Direction.Both).await  
enterBarrier("partition-healed")

Traffic Throttling (Deprecated)

@deprecated("Throttle is not implemented, use blackhole and passThrough.", "2.8.0")
def throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Double): Future[Done]

Note: Throttling functionality is deprecated. Use blackhole and passThrough for network failure simulation.

Connection Management

Disconnect Nodes

def disconnect(node: RoleName, target: RoleName): Future[Done]

Gracefully shuts down the network connection between two nodes.

Usage Example:

// Gracefully disconnect node1 from node2
testConductor.disconnect(node1, node2).await

enterBarrier("disconnected")

// Test behavior with broken connection...

Abort Connections

def abort(node: RoleName, target: RoleName): Future[Done]

Forcefully terminates the network connection using TCP_RESET between two nodes.

Usage Example:

// Forcefully abort connection (simulates network cable unplugged)
testConductor.abort(node1, node2).await

enterBarrier("connection-aborted")

Node Lifecycle Management

Shutdown Nodes

def shutdown(node: RoleName): Future[Done]
def shutdown(node: RoleName, abort: Boolean): Future[Done]

Shuts down a remote node's ActorSystem either gracefully (single parameter version) or with specified abort behavior.

Usage Example:

// Graceful shutdown (default)
testConductor.shutdown(worker1).await

// Explicit graceful shutdown
testConductor.shutdown(worker2, abort = false).await

// Abrupt shutdown (simulates kill -9)
testConductor.shutdown(worker3, abort = true).await

enterBarrier("nodes-shutdown")

Parameters:

  • node: RoleName - Node to shut down
  • abort: Boolean - If true, shutdown abruptly; if false, shutdown gracefully

Exit Nodes

def exit(node: RoleName, exitValue: Int): Future[Done]

Terminates a remote node using System.exit with the specified exit code.

Usage Example:

// Exit node with specific exit code
testConductor.exit(problematicNode, exitValue = 1).await

enterBarrier("node-exited")

Remove Nodes

def removeNode(node: RoleName): Future[Done]

Removes a node from the test coordination, allowing remaining nodes to continue.

Usage Example:

// Remove node from coordination
testConductor.removeNode(finishedNode).await

// Continue test with remaining nodes
enterBarrier("node-removed")

Get Connected Nodes

def getNodes: Future[Iterable[RoleName]]

Returns the collection of currently connected nodes.

Usage Example:

val connectedNodes = testConductor.getNodes.await
log.info(s"Connected nodes: ${connectedNodes.mkString(", ")}")

// Wait for all expected nodes
within(30.seconds) {
  awaitCond(testConductor.getNodes.await.size == expectedNodeCount)
}

Player Capabilities

The Player trait provides client-side functionality for participating in coordinated tests.

Client Connection

Start Client

def startClient(name: RoleName, controllerAddr: InetSocketAddress): Future[Done]

Connects to the TestConductor controller and registers as a test participant.

Usage Example:

val controllerAddr = new InetSocketAddress("testserver", 4711)
testConductor.startClient(myself, controllerAddr).await

log.info("Successfully connected to test controller")

Parameters:

  • name: RoleName - Role name of this client node
  • controllerAddr: InetSocketAddress - Address of the controller to connect to

Returns: Future[Done] - Completes when client is connected and all participants are ready

Barrier Coordination

Enter Barriers

def enter(name: String*): Unit
def enter(timeout: Timeout, name: immutable.Seq[String]): Unit

Enters the named barriers in sequence, blocking until all participants have reached each barrier.

Usage Example:

// Use default timeout
testConductor.enter("startup", "initialization", "ready")

// Use custom timeout
import akka.util.Timeout
import scala.concurrent.duration._

implicit val customTimeout = Timeout(60.seconds)
testConductor.enter(customTimeout, Seq("slow-barrier", "completion"))

Parameters:

  • name: String* or name: immutable.Seq[String] - Barrier names to enter in sequence
  • timeout: Timeout - Maximum time to wait for all participants

Throws: TimeoutException if any barrier times out

Address Resolution

Get Node Address

def getAddressFor(name: RoleName): Future[Address]

Queries the controller for the transport address of a specific node.

Usage Example:

val remoteAddress = testConductor.getAddressFor(workerNode).await
val remoteActorPath = RootActorPath(remoteAddress) / "user" / "service"

val selection = system.actorSelection(remoteActorPath)
selection ! "Hello from another node"

Parameters:

  • name: RoleName - Role name to get address for

Returns: Future[Address] - Transport address of the specified node

Configuration Settings

The TestConductor extension reads configuration from the akka.testconductor section:

object Settings {
  val ConnectTimeout: FiniteDuration      // Connection timeout
  val ClientReconnects: Int               // Number of reconnection attempts  
  val ReconnectBackoff: FiniteDuration    // Backoff between reconnects
  val BarrierTimeout: Timeout             // Default barrier timeout
  val QueryTimeout: Timeout               // Default query timeout
  val PacketSplitThreshold: FiniteDuration // Packet splitting threshold
  val ServerSocketWorkerPoolSize: Int     // Server worker pool size
  val ClientSocketWorkerPoolSize: Int     // Client worker pool size
}

Default Configuration:

akka.testconductor {
  connect-timeout = 20s
  client-reconnects = 30
  reconnect-backoff = 1s
  barrier-timeout = 30s
  query-timeout = 5s  
  packet-split-threshold = 100ms
  
  netty {
    server-socket-worker-pool {
      pool-size-min = 1
      pool-size-factor = 1.0
      pool-size-max = 2
    }
    client-socket-worker-pool {
      pool-size-min = 1
      pool-size-factor = 1.0
      pool-size-max = 2  
    }
  }
}

Direction Enum

sealed trait Direction
object Direction {
  case object Send extends Direction      // Outgoing traffic
  case object Receive extends Direction   // Incoming traffic  
  case object Both extends Direction      // Both directions
}

Complete Network Failure Test Example

object NetworkFailureConfig extends MultiNodeConfig {
  val controller = role("controller")
  val worker1 = role("worker1")
  val worker2 = role("worker2")
  
  // Enable network failure injection
  testTransport(on = true)
  
  commonConfig(ConfigFactory.parseString("""
    akka {
      actor.provider = cluster
      remote.artery.canonical.port = 0
      cluster.auto-down-unreachable-after = 5s
    }
  """))
}

class NetworkFailureTest extends MultiNodeSpec(NetworkFailureConfig)
  with AnyWordSpecLike with Matchers {
  
  import NetworkFailureConfig._
  
  def initialParticipants = roles.size
  
  "Network failure injection" must {
    "create network partition" in {
      enterBarrier("startup")
      
      // Create partition between worker nodes
      runOn(controller) {
        testConductor.blackhole(worker1, worker2, Direction.Both).await
      }
      
      enterBarrier("partition-created")
      
      // Test behavior during partition
      runOn(worker1) {
        // worker1 cannot communicate with worker2
        val selection = system.actorSelection(node(worker2) / "user" / "test")
        selection ! "ping"
        
        // Message should not reach worker2
        expectNoMessage(3.seconds)
      }
      
      enterBarrier("partition-tested")
      
      // Restore network
      runOn(controller) {
        testConductor.passThrough(worker1, worker2, Direction.Both).await
      }
      
      enterBarrier("partition-healed")
      
      // Verify communication restored
      runOn(worker2) {
        system.actorOf(Props(new Actor {
          def receive = {
            case "ping" => sender() ! "pong"
          }
        }), "test")
      }
      
      enterBarrier("actor-created")
      
      runOn(worker1) {
        val selection = system.actorSelection(node(worker2) / "user" / "test") 
        selection ! "ping"
        expectMsg("pong")
      }
      
      enterBarrier("communication-restored")
    }
    
    "simulate node failure" in {
      runOn(controller) {
        // Simulate sudden node death
        testConductor.abort(worker1, worker2).await
        testConductor.exit(worker2, 1).await
      }
      
      runOn(worker1) {
        // worker1 should detect worker2 as unreachable
        within(10.seconds) {
          awaitCond(Cluster(system).state.unreachable.nonEmpty)
        }
      }
      
      enterBarrier("node-failure-detected")
    }
  }
}