The TestConductor extension provides advanced coordination capabilities for multi-node tests, including barrier management, network failure injection, and node lifecycle control. It combines both Conductor (controller) and Player (participant) roles in a single extension.
object TestConductor extends ExtensionId[TestConductorExt] {
def apply()(implicit ctx: ActorContext): TestConductorExt
def get(system: ActorSystem): TestConductorExt
def get(system: ClassicActorSystemProvider): TestConductorExt
}
class TestConductorExt(system: ExtendedActorSystem)
extends Extension with Conductor with PlayerUsage Example:
// Get extension instance
val testConductor = TestConductor(system)
// Or within actor context
class MyActor extends Actor {
val testConductor = TestConductor(context.system)
}object TestConductor extends ExtensionId[TestConductorExt] {
def createExtension(system: ExtendedActorSystem): TestConductorExt
def lookup(): ExtensionId[_ <: Extension]
def apply(system: ActorSystem): TestConductorExt
def apply()(implicit context: ActorContext): TestConductorExt
}The TestConductor extension is automatically loaded and configured when accessed through the ExtensionId interface.
The Conductor trait provides orchestration features for controlling the test environment and coordinating between nodes.
def startController(
participants: Int,
name: RoleName,
controllerPort: InetSocketAddress
): Future[InetSocketAddress]
def startController(
participants: Int,
name: RoleName
): Future[InetSocketAddress]Starts the TestConductor controller which binds to a TCP port and coordinates all test participants. Also automatically starts the client connection.
Usage Example:
val controllerPort = new InetSocketAddress("localhost", 4711)
val actualPort = testConductor.startController(
participants = 3,
name = myself,
controllerPort = controllerPort
).await
log.info(s"Controller started on $actualPort")Parameters:
participants: Int - Number of participants that must connect before tests beginname: RoleName - Role name of the controller nodecontrollerPort: InetSocketAddress - Address to bind controller (port 0 = automatic)Returns: Future[InetSocketAddress] - Actual bound address of the controller
def sockAddr: Future[InetSocketAddress]Returns the actual bound address of the controller socket.
Usage Example:
val address = testConductor.sockAddr.await
println(s"Controller listening on ${address.getHostString}:${address.getPort}")Network failure injection requires enabling test transport in MultiNodeConfig:
testTransport(on = true) // Must be set in MultiNodeConfigdef blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done]Switches the network pipeline to drop all messages between the specified nodes in the given direction.
Usage Example:
// Block all traffic between node1 and node2
testConductor.blackhole(node1, node2, Direction.Both).await
// Block only messages sent from node1 to node2
testConductor.blackhole(node1, node2, Direction.Send).await
// Block only messages received by node1 from node2
testConductor.blackhole(node1, node2, Direction.Receive).awaitParameters:
node: RoleName - Node to apply the blackhole ontarget: RoleName - Target node for the connectiondirection: Direction - Traffic direction (Send, Receive, or Both)def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done]Restores normal message passing between nodes, removing any traffic restrictions.
Usage Example:
// Restore normal traffic after blackhole
testConductor.passThrough(node1, node2, Direction.Both).await
// Test network partition and recovery
testConductor.blackhole(node1, node2, Direction.Both).await
enterBarrier("partition-created")
// Test behavior during partition...
testConductor.passThrough(node1, node2, Direction.Both).await
enterBarrier("partition-healed")@deprecated("Throttle is not implemented, use blackhole and passThrough.", "2.8.0")
def throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Double): Future[Done]Note: Throttling functionality is deprecated. Use blackhole and passThrough for network failure simulation.
def disconnect(node: RoleName, target: RoleName): Future[Done]Gracefully shuts down the network connection between two nodes.
Usage Example:
// Gracefully disconnect node1 from node2
testConductor.disconnect(node1, node2).await
enterBarrier("disconnected")
// Test behavior with broken connection...def abort(node: RoleName, target: RoleName): Future[Done]Forcefully terminates the network connection using TCP_RESET between two nodes.
Usage Example:
// Forcefully abort connection (simulates network cable unplugged)
testConductor.abort(node1, node2).await
enterBarrier("connection-aborted")def shutdown(node: RoleName): Future[Done]
def shutdown(node: RoleName, abort: Boolean): Future[Done]Shuts down a remote node's ActorSystem either gracefully (single parameter version) or with specified abort behavior.
Usage Example:
// Graceful shutdown (default)
testConductor.shutdown(worker1).await
// Explicit graceful shutdown
testConductor.shutdown(worker2, abort = false).await
// Abrupt shutdown (simulates kill -9)
testConductor.shutdown(worker3, abort = true).await
enterBarrier("nodes-shutdown")Parameters:
node: RoleName - Node to shut downabort: Boolean - If true, shutdown abruptly; if false, shutdown gracefullydef exit(node: RoleName, exitValue: Int): Future[Done]Terminates a remote node using System.exit with the specified exit code.
Usage Example:
// Exit node with specific exit code
testConductor.exit(problematicNode, exitValue = 1).await
enterBarrier("node-exited")def removeNode(node: RoleName): Future[Done]Removes a node from the test coordination, allowing remaining nodes to continue.
Usage Example:
// Remove node from coordination
testConductor.removeNode(finishedNode).await
// Continue test with remaining nodes
enterBarrier("node-removed")def getNodes: Future[Iterable[RoleName]]Returns the collection of currently connected nodes.
Usage Example:
val connectedNodes = testConductor.getNodes.await
log.info(s"Connected nodes: ${connectedNodes.mkString(", ")}")
// Wait for all expected nodes
within(30.seconds) {
awaitCond(testConductor.getNodes.await.size == expectedNodeCount)
}The Player trait provides client-side functionality for participating in coordinated tests.
def startClient(name: RoleName, controllerAddr: InetSocketAddress): Future[Done]Connects to the TestConductor controller and registers as a test participant.
Usage Example:
val controllerAddr = new InetSocketAddress("testserver", 4711)
testConductor.startClient(myself, controllerAddr).await
log.info("Successfully connected to test controller")Parameters:
name: RoleName - Role name of this client nodecontrollerAddr: InetSocketAddress - Address of the controller to connect toReturns: Future[Done] - Completes when client is connected and all participants are ready
def enter(name: String*): Unit
def enter(timeout: Timeout, name: immutable.Seq[String]): UnitEnters the named barriers in sequence, blocking until all participants have reached each barrier.
Usage Example:
// Use default timeout
testConductor.enter("startup", "initialization", "ready")
// Use custom timeout
import akka.util.Timeout
import scala.concurrent.duration._
implicit val customTimeout = Timeout(60.seconds)
testConductor.enter(customTimeout, Seq("slow-barrier", "completion"))Parameters:
name: String* or name: immutable.Seq[String] - Barrier names to enter in sequencetimeout: Timeout - Maximum time to wait for all participantsThrows: TimeoutException if any barrier times out
def getAddressFor(name: RoleName): Future[Address]Queries the controller for the transport address of a specific node.
Usage Example:
val remoteAddress = testConductor.getAddressFor(workerNode).await
val remoteActorPath = RootActorPath(remoteAddress) / "user" / "service"
val selection = system.actorSelection(remoteActorPath)
selection ! "Hello from another node"Parameters:
name: RoleName - Role name to get address forReturns: Future[Address] - Transport address of the specified node
The TestConductor extension reads configuration from the akka.testconductor section:
object Settings {
val ConnectTimeout: FiniteDuration // Connection timeout
val ClientReconnects: Int // Number of reconnection attempts
val ReconnectBackoff: FiniteDuration // Backoff between reconnects
val BarrierTimeout: Timeout // Default barrier timeout
val QueryTimeout: Timeout // Default query timeout
val PacketSplitThreshold: FiniteDuration // Packet splitting threshold
val ServerSocketWorkerPoolSize: Int // Server worker pool size
val ClientSocketWorkerPoolSize: Int // Client worker pool size
}Default Configuration:
akka.testconductor {
connect-timeout = 20s
client-reconnects = 30
reconnect-backoff = 1s
barrier-timeout = 30s
query-timeout = 5s
packet-split-threshold = 100ms
netty {
server-socket-worker-pool {
pool-size-min = 1
pool-size-factor = 1.0
pool-size-max = 2
}
client-socket-worker-pool {
pool-size-min = 1
pool-size-factor = 1.0
pool-size-max = 2
}
}
}sealed trait Direction
object Direction {
case object Send extends Direction // Outgoing traffic
case object Receive extends Direction // Incoming traffic
case object Both extends Direction // Both directions
}object NetworkFailureConfig extends MultiNodeConfig {
val controller = role("controller")
val worker1 = role("worker1")
val worker2 = role("worker2")
// Enable network failure injection
testTransport(on = true)
commonConfig(ConfigFactory.parseString("""
akka {
actor.provider = cluster
remote.artery.canonical.port = 0
cluster.auto-down-unreachable-after = 5s
}
"""))
}
class NetworkFailureTest extends MultiNodeSpec(NetworkFailureConfig)
with AnyWordSpecLike with Matchers {
import NetworkFailureConfig._
def initialParticipants = roles.size
"Network failure injection" must {
"create network partition" in {
enterBarrier("startup")
// Create partition between worker nodes
runOn(controller) {
testConductor.blackhole(worker1, worker2, Direction.Both).await
}
enterBarrier("partition-created")
// Test behavior during partition
runOn(worker1) {
// worker1 cannot communicate with worker2
val selection = system.actorSelection(node(worker2) / "user" / "test")
selection ! "ping"
// Message should not reach worker2
expectNoMessage(3.seconds)
}
enterBarrier("partition-tested")
// Restore network
runOn(controller) {
testConductor.passThrough(worker1, worker2, Direction.Both).await
}
enterBarrier("partition-healed")
// Verify communication restored
runOn(worker2) {
system.actorOf(Props(new Actor {
def receive = {
case "ping" => sender() ! "pong"
}
}), "test")
}
enterBarrier("actor-created")
runOn(worker1) {
val selection = system.actorSelection(node(worker2) / "user" / "test")
selection ! "ping"
expectMsg("pong")
}
enterBarrier("communication-restored")
}
"simulate node failure" in {
runOn(controller) {
// Simulate sudden node death
testConductor.abort(worker1, worker2).await
testConductor.exit(worker2, 1).await
}
runOn(worker1) {
// worker1 should detect worker2 as unreachable
within(10.seconds) {
awaitCond(Cluster(system).state.unreachable.nonEmpty)
}
}
enterBarrier("node-failure-detected")
}
}
}