Akka's scheduling system provides task scheduling for delayed and periodic execution with cancellation support, plus actor-specific timer management.
Global scheduler for delayed and periodic task execution.
/**
* Task scheduler interface
*/
trait Scheduler {
/**
* Schedule periodic message sending
* @param initialDelay - Delay before first message
* @param interval - Interval between messages
* @param receiver - Target actor
* @param message - Message to send
* @param executor - Execution context
* @return Cancellable handle
*/
def schedule(
initialDelay: Duration,
interval: Duration,
receiver: ActorRef,
message: Any
)(implicit executor: ExecutionContext): Cancellable
/**
* Schedule single message after delay
* @param delay - Delay before sending
* @param receiver - Target actor
* @param message - Message to send
* @param executor - Execution context
* @return Cancellable handle
*/
def scheduleOnce(
delay: Duration,
receiver: ActorRef,
message: Any
)(implicit executor: ExecutionContext): Cancellable
/**
* Schedule periodic Runnable execution
* @param initialDelay - Delay before first execution
* @param interval - Interval between executions
* @param runnable - Task to execute
* @param executor - Execution context
* @return Cancellable handle
*/
def schedule(
initialDelay: Duration,
interval: Duration,
runnable: Runnable
)(implicit executor: ExecutionContext): Cancellable
/**
* Schedule single Runnable execution after delay
* @param delay - Delay before execution
* @param runnable - Task to execute
* @param executor - Execution context
* @return Cancellable handle
*/
def scheduleOnce(
delay: Duration,
runnable: Runnable
)(implicit executor: ExecutionContext): Cancellable
/**
* Current time in milliseconds
*/
def currentTimeMillis(): Long
}Usage Examples:
import scala.concurrent.duration._
import akka.actor._
class SchedulingActor extends Actor {
import context.{dispatcher, system}
def receive = {
case "start-periodic" =>
// Schedule periodic message to self
val cancellable = system.scheduler.schedule(
initialDelay = 1.second,
interval = 5.seconds,
receiver = self,
message = "tick"
)
// Store cancellable to cancel later
context.become(running(cancellable))
case "schedule-once" =>
// Schedule single delayed message
system.scheduler.scheduleOnce(10.seconds, self, "delayed-message")
case "schedule-runnable" =>
// Schedule periodic task execution
val task = new Runnable {
def run(): Unit = println(s"Task executed at ${System.currentTimeMillis()}")
}
system.scheduler.schedule(2.seconds, 3.seconds, task)
}
def running(cancellable: Cancellable): Receive = {
case "tick" =>
println("Periodic tick received")
case "stop-periodic" =>
cancellable.cancel()
context.unbecome()
case "delayed-message" =>
println("Delayed message received")
}
}
// Usage
val actor = system.actorOf(Props[SchedulingActor])
actor ! "start-periodic"
actor ! "schedule-once"Handle for cancelling scheduled tasks.
/**
* Handle for cancelling scheduled operations
*/
trait Cancellable {
/**
* Cancel the scheduled operation
* @return true if cancellation was successful, false if already completed
*/
def cancel(): Boolean
/**
* Check if operation has been cancelled
* @return true if cancelled
*/
def isCancelled: Boolean
}
/**
* Utility for creating pre-cancelled Cancellable
*/
object Cancellable {
/**
* Already cancelled Cancellable instance
*/
val cancelled: Cancellable
/**
* Create combined Cancellable that cancels all when cancelled
* @param cancellables - Cancellables to combine
* @return Combined Cancellable
*/
def apply(cancellables: Cancellable*): Cancellable
}Usage Examples:
class CancellableActor extends Actor {
import context.{dispatcher, system}
var scheduledTasks: List[Cancellable] = List.empty
def receive = {
case "start-tasks" =>
// Schedule multiple tasks
val task1 = system.scheduler.scheduleOnce(5.seconds, self, "task1")
val task2 = system.scheduler.schedule(1.second, 2.seconds, self, "task2")
val task3 = system.scheduler.scheduleOnce(10.seconds, self, "task3")
scheduledTasks = List(task1, task2, task3)
case "cancel-all" =>
// Cancel all scheduled tasks
scheduledTasks.foreach(_.cancel())
scheduledTasks = List.empty
sender() ! "All tasks cancelled"
case "check-status" =>
val statuses = scheduledTasks.map(_.isCancelled)
sender() ! s"Task cancellation status: $statuses"
case msg =>
println(s"Received scheduled message: $msg")
}
override def postStop(): Unit = {
// Clean up on actor stop
scheduledTasks.foreach(_.cancel())
}
}Actor-specific timer management with automatic cleanup.
/**
* Timer management for actors
*/
trait Timers { this: Actor =>
/**
* Timer scheduler instance
*/
def timers: TimerScheduler
}
/**
* Timer scheduler interface
*/
trait TimerScheduler {
/**
* Start single-shot timer
* @param key - Unique timer identifier
* @param msg - Message to send when timer fires
* @param delay - Delay before firing
*/
def startSingleTimer(key: Any, msg: Any, delay: Duration): Unit
/**
* Start timer with fixed delay between executions
* @param key - Unique timer identifier
* @param msg - Message to send repeatedly
* @param delay - Delay between executions
*/
def startTimerWithFixedDelay(key: Any, msg: Any, delay: Duration): Unit
/**
* Start timer with fixed rate (interval from start of previous execution)
* @param key - Unique timer identifier
* @param msg - Message to send repeatedly
* @param interval - Interval between execution starts
*/
def startTimerAtFixedRate(key: Any, msg: Any, interval: Duration): Unit
/**
* Cancel timer by key
* @param key - Timer identifier to cancel
*/
def cancel(key: Any): Unit
/**
* Cancel all timers
*/
def cancelAll(): Unit
/**
* Check if timer is active
* @param key - Timer identifier to check
* @return true if timer exists and is active
*/
def isTimerActive(key: Any): Boolean
}Usage Examples:
import akka.actor.{Actor, Timers}
import scala.concurrent.duration._
// Timer messages
case object Tick
case object HealthCheck
case object Cleanup
class TimerActor extends Actor with Timers {
override def preStart(): Unit = {
// Start timers on actor startup
timers.startSingleTimer("startup", "initialized", 1.second)
timers.startTimerWithFixedDelay("health", HealthCheck, 30.seconds)
}
def receive = {
case "initialized" =>
println("Actor initialized")
// Start periodic tick timer
timers.startTimerAtFixedRate("tick", Tick, 5.seconds)
case "start-cleanup" =>
// Start periodic cleanup with fixed delay
timers.startTimerWithFixedDelay("cleanup", Cleanup, 1.minute)
case "stop-cleanup" =>
timers.cancel("cleanup")
println("Cleanup timer stopped")
case Tick =>
println(s"Tick at ${System.currentTimeMillis()}")
case HealthCheck =>
println("Performing health check")
// Health check logic
case Cleanup =>
println("Performing cleanup")
// Cleanup logic
case "status" =>
val tickActive = timers.isTimerActive("tick")
val cleanupActive = timers.isTimerActive("cleanup")
sender() ! s"Tick: $tickActive, Cleanup: $cleanupActive"
case "stop-all" =>
timers.cancelAll()
println("All timers cancelled")
}
override def postStop(): Unit = {
// Timers are automatically cancelled when actor stops
println("Actor stopped, timers cleaned up")
}
}
// Usage with timer keys
class KeyedTimerActor extends Actor with Timers {
def receive = {
case ("start", key: String, delay: Duration) =>
timers.startSingleTimer(key, s"timer-$key", delay)
case ("stop", key: String) =>
timers.cancel(key)
case ("check", key: String) =>
val active = timers.isTimerActive(key)
sender() ! s"Timer $key active: $active"
case msg: String if msg.startsWith("timer-") =>
val key = msg.stripPrefix("timer-")
println(s"Timer $key fired")
}
}Common patterns for scheduling and timer usage.
/**
* Retry with exponential backoff
*/
def scheduleRetry[T](
operation: () => Future[T],
maxRetries: Int,
initialDelay: Duration,
backoffFactor: Double = 2.0
)(implicit system: ActorSystem): Future[T]
/**
* Schedule with jitter to avoid thundering herd
*/
def scheduleWithJitter(
baseDelay: Duration,
jitterFactor: Double = 0.1
): DurationUsage Examples:
import scala.util.Random
class RetryActor extends Actor with Timers {
import context.dispatcher
case class RetryableOperation(attempt: Int, maxAttempts: Int)
def receive = {
case "start-with-retry" =>
attemptOperation(1, 3)
case RetryableOperation(attempt, maxAttempts) =>
performOperation() match {
case Some(result) =>
sender() ! s"Success after $attempt attempts: $result"
case None if attempt < maxAttempts =>
// Exponential backoff with jitter
val baseDelay = (math.pow(2, attempt) * 1000).millis
val jitter = (Random.nextDouble() * 0.2 + 0.9) // 90-110% of base delay
val delay = (baseDelay.toMillis * jitter).millis
timers.startSingleTimer(
s"retry-$attempt",
RetryableOperation(attempt + 1, maxAttempts),
delay
)
case None =>
sender() ! s"Failed after $maxAttempts attempts"
}
}
private def attemptOperation(attempt: Int, maxAttempts: Int): Unit = {
self ! RetryableOperation(attempt, maxAttempts)
}
private def performOperation(): Option[String] = {
// Simulate operation that might fail
if (Random.nextDouble() < 0.3) Some("Operation result")
else None
}
}
// Batching with timers
class BatchingActor extends Actor with Timers {
import context.dispatcher
case object ProcessBatch
private var batch: List[String] = List.empty
private val batchSize = 10
private val batchTimeout = 5.seconds
def receive = {
case item: String =>
batch = item :: batch
if (batch.size == 1) {
// Start batch timeout on first item
timers.startSingleTimer("batch-timeout", ProcessBatch, batchTimeout)
}
if (batch.size >= batchSize) {
// Process full batch immediately
processBatch()
}
case ProcessBatch =>
if (batch.nonEmpty) {
processBatch()
}
}
private def processBatch(): Unit = {
timers.cancel("batch-timeout")
val currentBatch = batch.reverse
batch = List.empty
println(s"Processing batch of ${currentBatch.size} items: $currentBatch")
// Process batch logic here
}
}/**
* Light Array Revolving Scheduler - default implementation
*/
class LightArrayRevolverScheduler(
config: Config,
log: LoggingAdapter,
threadFactory: ThreadFactory
) extends Scheduler with Closeable
/**
* Hash wheel timer implementation
*/
class HashedWheelTimer(
tickDuration: Duration,
wheelSize: Int,
threadFactory: ThreadFactory
) extends Scheduler
/**
* Abstract base for custom schedulers
*/
abstract class AbstractScheduler extends Scheduler {
/**
* Maximum supported delay
*/
def maxFrequency: Double
}
/**
* Timer configuration
*/
trait TimerConfig {
def tickDuration: Duration
def wheelSize: Int
def tasksPerTick: Int
}
/**
* Scheduled task interface
*/
trait ScheduledTask {
def run(): Unit
def cancel(): Unit
def isCancelled: Boolean
}