Akka TestKit provides specialized actor references that enable synchronous access to actor internals and state, making it possible to test actor behavior in ways that would be impossible with normal actor references.
class TestActorRef[T <: Actor](
_system: ActorSystem,
_props: Props,
_supervisor: ActorRef,
name: String
) extends InternalActorRef {
def receive(o: Any): Unit
def receive(o: Any, sender: ActorRef): Unit
def underlyingActor: T
def watch(subject: ActorRef): ActorRef
def unwatch(subject: ActorRef): ActorRef
}A specialized actor reference that allows synchronous access to the underlying actor instance and direct message processing. This enables white-box testing of actor internals.
object TestActorRef {
def apply[T <: Actor](props: Props)(implicit system: ActorSystem): TestActorRef[T]
def apply[T <: Actor](props: Props, name: String)(implicit system: ActorSystem): TestActorRef[T]
def apply[T <: Actor](props: Props, supervisor: ActorRef)(implicit system: ActorSystem): TestActorRef[T]
def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T]
// Java API
def create[T <: Actor](system: ActorSystem, props: Props): TestActorRef[T]
def create[T <: Actor](system: ActorSystem, props: Props, name: String): TestActorRef[T]
def create[T <: Actor](system: ActorSystem, props: Props, supervisor: ActorRef): TestActorRef[T]
def create[T <: Actor](system: ActorSystem, props: Props, supervisor: ActorRef, name: String): TestActorRef[T]
}import akka.testkit.TestActorRef
import akka.actor.{Actor, Props}
class CounterActor extends Actor {
private var count = 0
def receive = {
case "increment" => count += 1
case "get" => sender() ! count
case "reset" => count = 0
}
def getCount: Int = count // Public method for testing
}
// Create TestActorRef
val counterRef = TestActorRef[CounterActor](Props[CounterActor])
// Access underlying actor instance
val counter = counterRef.underlyingActor
println(s"Initial count: ${counter.getCount}") // 0
// Process messages synchronously
counterRef.receive("increment")
counterRef.receive("increment")
println(s"Count after increments: ${counter.getCount}") // 2
// Test with sender
counterRef.receive("get", testActor)
expectMsg(2)val namedRef = TestActorRef[CounterActor](Props[CounterActor], "test-counter")
println(s"Actor path: ${namedRef.path}") // /user/test-counterval supervisor = system.actorOf(Props[SupervisorActor])
val supervisedRef = TestActorRef[CounterActor](Props[CounterActor], supervisor, "supervised-counter")class StatefulActor extends Actor {
private var state = "initial"
def receive = {
case newState: String => state = newState
case "get-state" => sender() ! state
}
def getCurrentState: String = state
}
val actorRef = TestActorRef[StatefulActor](Props[StatefulActor])
val actor = actorRef.underlyingActor
// Test state changes
assert(actor.getCurrentState == "initial")
actorRef.receive("active")
assert(actor.getCurrentState == "active")
actorRef.receive("inactive")
assert(actor.getCurrentState == "inactive")class FaultyActor extends Actor {
def receive = {
case "fail" => throw new RuntimeException("Test failure")
case "work" => sender() ! "done"
}
}
val faultyRef = TestActorRef[FaultyActor](Props[FaultyActor])
// Test exception handling
intercept[RuntimeException] {
faultyRef.receive("fail")
}
// Normal operation still works
faultyRef.receive("work", testActor)
expectMsg("done")class TestFSMRef[S, D, T <: Actor](
system: ActorSystem,
props: Props,
supervisor: ActorRef,
name: String
) extends TestActorRef[T] {
def stateName: S
def stateData: D
def setState(stateName: S, stateData: D): Unit
def setState(stateName: S, stateData: D, timeout: FiniteDuration): Unit
def setState(stateName: S, stateData: D, timeout: FiniteDuration, stopReason: Option[FSM.Reason]): Unit
def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean = false): Unit
def cancelTimer(name: String): Unit
def isTimerActive(name: String): Boolean
def isStateTimerActive: Boolean
}Specialized TestActorRef for testing Finite State Machine actors, providing direct access to FSM state and timer control.
object TestFSMRef {
def apply[S, D, T <: Actor: ClassTag](factory: => T)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
def apply[S, D, T <: Actor: ClassTag](factory: => T, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
def apply[S, D, T <: Actor: ClassTag](factory: => T, supervisor: ActorRef)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
def apply[S, D, T <: Actor: ClassTag](factory: => T, supervisor: ActorRef, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T]
}import akka.actor.{FSM, Actor}
import akka.testkit.TestFSMRef
import scala.concurrent.duration._
// FSM States
sealed trait State
case object Idle extends State
case object Active extends State
case object Stopped extends State
// FSM Data
sealed trait Data
case object Empty extends Data
case class Todo(task: String) extends Data
class WorkerFSM extends Actor with FSM[State, Data] {
startWith(Idle, Empty)
when(Idle) {
case Event("start", Empty) =>
goto(Active) using Todo("initial task")
}
when(Active) {
case Event("work", Todo(task)) =>
// Process task
sender() ! s"processed: $task"
stay()
case Event("stop", _) =>
goto(Stopped) using Empty
}
when(Stopped) {
case Event(_, _) => stay()
}
}
// Create TestFSMRef
val fsmRef = TestFSMRef(new WorkerFSM)
// Test initial state
assert(fsmRef.stateName == Idle)
assert(fsmRef.stateData == Empty)
// Test state transitions
fsmRef ! "start"
assert(fsmRef.stateName == Active)
assert(fsmRef.stateData == Todo("initial task"))
// Test message processing in state
fsmRef.receive("work", testActor)
expectMsg("processed: initial task")
assert(fsmRef.stateName == Active) // Still active
// Test final transition
fsmRef ! "stop"
assert(fsmRef.stateName == Stopped)
assert(fsmRef.stateData == Empty)val fsmRef = TestFSMRef(new WorkerFSM)
// Manually set FSM to specific state
fsmRef.setState(Active, Todo("manual task"))
assert(fsmRef.stateName == Active)
assert(fsmRef.stateData == Todo("manual task"))
// Set state with timeout
fsmRef.setState(Active, Todo("timed task"), 5.seconds)
assert(fsmRef.isStateTimerActive)class TimedFSM extends Actor with FSM[State, Data] {
startWith(Idle, Empty)
when(Idle, stateTimeout = 3.seconds) {
case Event(StateTimeout, _) =>
goto(Active) using Todo("timeout triggered")
case Event("manual-start", _) =>
goto(Active) using Todo("manual start")
}
when(Active) {
case Event("set-reminder", _) =>
setTimer("reminder", "reminder-tick", 1.second, repeat = true)
stay()
case Event("reminder-tick", _) =>
sender() ! "tick"
stay()
case Event("cancel-reminder", _) =>
cancelTimer("reminder")
stay()
}
}
val timedFsmRef = TestFSMRef(new TimedFSM)
// Test state timeout
assert(timedFsmRef.stateName == Idle)
assert(timedFsmRef.isStateTimerActive)
// Test custom timers
timedFsmRef.setState(Active, Empty)
timedFsmRef.receive("set-reminder")
assert(timedFsmRef.isTimerActive("reminder"))
// Manually trigger timer
timedFsmRef.receive("reminder-tick", testActor)
expectMsg("tick")
// Cancel timer
timedFsmRef.receive("cancel-reminder")
assert(!timedFsmRef.isTimerActive("reminder"))
// Manual timer control
timedFsmRef.setTimer("test-timer", "test-message", 500.millis)
assert(timedFsmRef.isTimerActive("test-timer"))
timedFsmRef.cancelTimer("test-timer")
assert(!timedFsmRef.isTimerActive("test-timer"))import akka.actor.FSM
class ComplexFSM extends Actor with FSM[State, Data] {
startWith(Idle, Empty)
when(Idle) {
case Event("shutdown", _) =>
stop(FSM.Shutdown)
case Event("failure", _) =>
stop(FSM.Failure("Simulated failure"))
}
}
val complexFsmRef = TestFSMRef(new ComplexFSM)
// Test stop with reason
watch(complexFsmRef)
complexFsmRef ! "shutdown"
expectTerminated(complexFsmRef)
// For failure testing
val anotherFsmRef = TestFSMRef(new ComplexFSM)
watch(anotherFsmRef)
anotherFsmRef ! "failure"
expectTerminated(anotherFsmRef)| Feature | TestActorRef | Regular ActorRef |
|---|---|---|
| Message Processing | Synchronous | Asynchronous |
| Actor Access | Direct access via underlyingActor | No direct access |
| Thread Safety | Caller's thread | Actor's thread |
| Performance | Faster for testing | Normal actor performance |
| Production Use | Testing only | Production ready |
Use TestActorRef when:
Use regular ActorRef when:
class AccountActor extends Actor {
private var balance = 0.0
def receive = {
case Deposit(amount) =>
balance += amount
sender() ! BalanceUpdated(balance)
case Withdraw(amount) =>
if (balance >= amount) {
balance -= amount
sender() ! BalanceUpdated(balance)
} else {
sender() ! InsufficientFunds(balance)
}
case GetBalance =>
sender() ! CurrentBalance(balance)
}
def currentBalance: Double = balance // For testing
}
// TestActorRef approach - direct state testing
val testRef = TestActorRef[AccountActor](Props[AccountActor])
val account = testRef.underlyingActor
testRef.receive(Deposit(100.0))
assert(account.currentBalance == 100.0) // Direct state check
testRef.receive(Withdraw(50.0), testActor)
expectMsg(BalanceUpdated(50.0))
assert(account.currentBalance == 50.0) // Direct state check
// Regular ActorRef approach - behavior testing
val regularRef = system.actorOf(Props[AccountActor])
regularRef ! Deposit(100.0)
expectMsg(BalanceUpdated(100.0))
regularRef ! GetBalance
expectMsg(CurrentBalance(100.0))
regularRef ! Withdraw(150.0)
expectMsg(InsufficientFunds(100.0))receive() and message sending// Good: Unit testing actor logic
val testRef = TestActorRef[MyActor](Props[MyActor])
testRef.receive("test-message")
assert(testRef.underlyingActor.someState == expectedValue)
// Good: Testing both approaches
testRef.receive("sync-test") // Synchronous
testRef ! "async-test" // Asynchronous
expectMsg("response")setState() to test from specific states// Good: Comprehensive FSM testing
val fsmRef = TestFSMRef(new MyFSM)
// Test normal flow
fsmRef ! StartEvent
assert(fsmRef.stateName == Active)
// Test from specific state
fsmRef.setState(ErrorState, ErrorData("test error"))
fsmRef ! RecoveryEvent
assert(fsmRef.stateName == Active)
// Test timer behavior
assert(fsmRef.isTimerActive("cleanup"))
fsmRef.cancelTimer("cleanup")
assert(!fsmRef.isTimerActive("cleanup"))