Manual time control and time dilation utilities for deterministic testing of time-sensitive actor behaviors including scheduled messages, timeouts, and time-based logic.
Manual scheduler control for deterministic testing of time-sensitive behaviors.
/**
* Factory methods for ManualTime controller
*/
object ManualTime {
/** Create manual time controller */
def apply(): ManualTime
}
/**
* Manual time controller for deterministic testing
*/
trait ManualTime {
/** Test scheduler that can be manually advanced */
def scheduler: TestScheduler
/** Advance time manually by specified amount */
def timePasses(amount: FiniteDuration): Unit
/** Assert no messages received during time advancement */
def expectNoMessageFor(duration: FiniteDuration, on: TestProbe[_]*): Unit
}Usage Examples:
import akka.actor.testkit.typed.scaladsl.{ActorTestKit, ManualTime}
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import scala.concurrent.duration._
val manualTime = ManualTime()
val testKit = ActorTestKit(ActorTestKit.ApplicationTestConfig)
// Behavior with timer
def timerBehavior(): Behavior[String] = Behaviors.withTimers { timers =>
timers.startSingleTimer("timeout", "TIMEOUT", 5.seconds)
Behaviors.receiveMessage {
case "TIMEOUT" =>
println("Timer expired!")
Behaviors.same
case msg =>
println(s"Received: $msg")
Behaviors.same
}
}
val actor = testKit.spawn(timerBehavior())
val probe = testKit.createTestProbe[String]()
// Time hasn't passed yet - no timeout
probe.expectNoMessage(100.millis)
// Manually advance time
manualTime.timePasses(5.seconds)
// Now timeout should have fired
// (In real implementation, you'd verify the timeout behavior)Implicit class providing time dilation functionality for individual durations.
/**
* Implicit class providing dilated method for FiniteDuration
*/
implicit class TestDuration(val duration: FiniteDuration) extends AnyVal {
/** Apply time dilation from TestKitSettings to this duration */
def dilated(implicit settings: TestKitSettings): FiniteDuration
}Usage Examples:
import akka.actor.testkit.typed.scaladsl._
import scala.concurrent.duration._
val testKit = ActorTestKit()
import testKit.testKitSettings // Brings implicit TestKitSettings into scope
// Apply dilation to specific durations
val originalTimeout = 1.second
val dilatedTimeout = originalTimeout.dilated // Scaled by TestKitSettings.TestTimeFactor
val probe = testKit.createTestProbe[String]()
probe.expectMessage(100.millis.dilated, "expected message")Manual scheduler implementation for controlling time advancement in tests.
/**
* Test scheduler that allows manual time advancement
*/
trait TestScheduler extends Scheduler {
/** Advance scheduler time manually */
def advance(duration: FiniteDuration): Unit
/** Get current scheduler time */
def currentTime: Long
/** Schedule task with manual time control */
def scheduleOnce(delay: FiniteDuration, runnable: Runnable): Cancellable
/** Schedule recurring task with manual time control */
def scheduleWithFixedDelay(
initialDelay: FiniteDuration,
delay: FiniteDuration,
runnable: Runnable
): Cancellable
}Utilities for testing behaviors that use Akka timers and scheduled operations.
/**
* Assert no messages received during manual time advancement
* @param duration Amount of time to advance
* @param on Test probes to check for messages
*/
def expectNoMessageFor(duration: FiniteDuration, on: TestProbe[_]*): Unit
/**
* Advance time and trigger any scheduled operations
* @param amount Duration to advance time by
*/
def timePasses(amount: FiniteDuration): UnitUsage Examples:
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import scala.concurrent.duration._
val manualTime = ManualTime()
val testKit = ActorTestKit()
// Behavior with periodic timer
def heartbeatBehavior(probe: ActorRef[String]): Behavior[String] =
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay("heartbeat", "BEAT", 1.second)
Behaviors.receiveMessage {
case "BEAT" =>
probe ! "heartbeat"
Behaviors.same
case "stop" =>
timers.cancel("heartbeat")
Behaviors.stopped
}
}
val probe = testKit.createTestProbe[String]()
val actor = testKit.spawn(heartbeatBehavior(probe.ref))
// No heartbeat yet
probe.expectNoMessage(100.millis)
// Advance time - first heartbeat
manualTime.timePasses(1.second)
probe.expectMessage("heartbeat")
// Advance time - second heartbeat
manualTime.timePasses(1.second)
probe.expectMessage("heartbeat")
// Verify no messages during advancement
manualTime.expectNoMessageFor(500.millis, probe)Testing behaviors with timeout logic using manual time control.
Usage Examples:
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import scala.concurrent.duration._
val manualTime = ManualTime()
val testKit = ActorTestKit()
// Behavior with request timeout
def requestTimeoutBehavior(): Behavior[String] = Behaviors.withTimers { timers =>
def waitingForResponse(): Behavior[String] = {
timers.startSingleTimer("request-timeout", "TIMEOUT", 30.seconds)
Behaviors.receiveMessage {
case "response" =>
timers.cancel("request-timeout")
println("Response received in time")
idle()
case "TIMEOUT" =>
println("Request timed out")
idle()
case "request" =>
println("Already waiting for response")
Behaviors.same
}
}
def idle(): Behavior[String] = {
Behaviors.receiveMessage {
case "request" =>
println("Making request")
waitingForResponse()
case _ =>
Behaviors.same
}
}
idle()
}
val actor = testKit.spawn(requestTimeoutBehavior())
// Start request
actor ! "request"
// Don't send response - let it timeout
manualTime.timePasses(30.seconds)
// Timeout should have triggered
// Test successful response within timeout
actor ! "request"
manualTime.timePasses(15.seconds) // Only half the timeout
actor ! "response" // Should cancel timeoutTesting behaviors that schedule messages to themselves or other actors.
Usage Examples:
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import scala.concurrent.duration._
val manualTime = ManualTime()
val testKit = ActorTestKit()
// Behavior that schedules delayed processing
def delayedProcessingBehavior(processor: ActorRef[String]): Behavior[String] =
Behaviors.withTimers { timers =>
var messageCount = 0
Behaviors.receiveMessage {
case msg =>
messageCount += 1
val delayKey = s"process-$messageCount"
// Schedule processing after 2 seconds
timers.startSingleTimer(delayKey, s"PROCESS:$msg", 2.seconds)
Behaviors.same
case processMsg if processMsg.startsWith("PROCESS:") =>
val actualMsg = processMsg.substring(8)
processor ! s"Processed: $actualMsg"
Behaviors.same
}
}
val processor = testKit.createTestProbe[String]()
val actor = testKit.spawn(delayedProcessingBehavior(processor.ref))
// Send message - won't be processed immediately
actor ! "message1"
processor.expectNoMessage(100.millis)
// Advance time - should trigger processing
manualTime.timePasses(2.seconds)
processor.expectMessage("Processed: message1")
// Test multiple scheduled messages
actor ! "message2"
actor ! "message3"
// Advance time - both should be processed
manualTime.timePasses(2.seconds)
processor.expectMessage("Processed: message2")
processor.expectMessage("Processed: message3")Combining manual time control with real actor system testing for complex time-dependent scenarios.
Usage Examples:
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import scala.concurrent.duration._
val manualTime = ManualTime()
val testKit = ActorTestKit()
// Complex behavior with multiple timers
def multiTimerBehavior(): Behavior[String] = Behaviors.withTimers { timers =>
def active(): Behavior[String] = {
// Health check every 10 seconds
timers.startTimerWithFixedDelay("health-check", "HEALTH_CHECK", 10.seconds)
// Timeout after 60 seconds of inactivity
timers.startSingleTimer("inactivity-timeout", "INACTIVE_TIMEOUT", 60.seconds)
Behaviors.receiveMessage {
case "HEALTH_CHECK" =>
println("Health check")
Behaviors.same
case "INACTIVE_TIMEOUT" =>
println("Going inactive due to timeout")
timers.cancel("health-check")
inactive()
case "activity" =>
// Reset inactivity timeout
timers.startSingleTimer("inactivity-timeout", "INACTIVE_TIMEOUT", 60.seconds)
Behaviors.same
case "shutdown" =>
timers.cancelAll()
Behaviors.stopped
}
}
def inactive(): Behavior[String] = {
Behaviors.receiveMessage {
case "activity" =>
println("Becoming active")
active()
case _ =>
Behaviors.same
}
}
active()
}
val actor = testKit.spawn(multiTimerBehavior())
// Test health checks
manualTime.timePasses(10.seconds)
// First health check
manualTime.timePasses(10.seconds)
// Second health check
// Test inactivity timeout
manualTime.timePasses(40.seconds) // Total 60 seconds since last activity
// Should timeout and go inactive
// Test reactivation
actor ! "activity"
// Should become active again
manualTime.timePasses(10.seconds)
// Health checks should resume