Akka Actor provides the foundational Actor Model implementation for building concurrent, distributed, resilient and elastic applications with supervision hierarchies and location transparency.
Akka's scheduling system provides task scheduling for delayed and periodic execution with cancellation support, plus actor-specific timer management.
Global scheduler for delayed and periodic task execution.
/**
* Task scheduler interface
*/
trait Scheduler {
/**
* Schedule periodic message sending
* @param initialDelay - Delay before first message
* @param interval - Interval between messages
* @param receiver - Target actor
* @param message - Message to send
* @param executor - Execution context
* @return Cancellable handle
*/
def schedule(
initialDelay: Duration,
interval: Duration,
receiver: ActorRef,
message: Any
)(implicit executor: ExecutionContext): Cancellable
/**
* Schedule single message after delay
* @param delay - Delay before sending
* @param receiver - Target actor
* @param message - Message to send
* @param executor - Execution context
* @return Cancellable handle
*/
def scheduleOnce(
delay: Duration,
receiver: ActorRef,
message: Any
)(implicit executor: ExecutionContext): Cancellable
/**
* Schedule periodic Runnable execution
* @param initialDelay - Delay before first execution
* @param interval - Interval between executions
* @param runnable - Task to execute
* @param executor - Execution context
* @return Cancellable handle
*/
def schedule(
initialDelay: Duration,
interval: Duration,
runnable: Runnable
)(implicit executor: ExecutionContext): Cancellable
/**
* Schedule single Runnable execution after delay
* @param delay - Delay before execution
* @param runnable - Task to execute
* @param executor - Execution context
* @return Cancellable handle
*/
def scheduleOnce(
delay: Duration,
runnable: Runnable
)(implicit executor: ExecutionContext): Cancellable
/**
* Current time in milliseconds
*/
def currentTimeMillis(): Long
}Usage Examples:
import scala.concurrent.duration._
import akka.actor._
class SchedulingActor extends Actor {
import context.{dispatcher, system}
def receive = {
case "start-periodic" =>
// Schedule periodic message to self
val cancellable = system.scheduler.schedule(
initialDelay = 1.second,
interval = 5.seconds,
receiver = self,
message = "tick"
)
// Store cancellable to cancel later
context.become(running(cancellable))
case "schedule-once" =>
// Schedule single delayed message
system.scheduler.scheduleOnce(10.seconds, self, "delayed-message")
case "schedule-runnable" =>
// Schedule periodic task execution
val task = new Runnable {
def run(): Unit = println(s"Task executed at ${System.currentTimeMillis()}")
}
system.scheduler.schedule(2.seconds, 3.seconds, task)
}
def running(cancellable: Cancellable): Receive = {
case "tick" =>
println("Periodic tick received")
case "stop-periodic" =>
cancellable.cancel()
context.unbecome()
case "delayed-message" =>
println("Delayed message received")
}
}
// Usage
val actor = system.actorOf(Props[SchedulingActor])
actor ! "start-periodic"
actor ! "schedule-once"Handle for cancelling scheduled tasks.
/**
* Handle for cancelling scheduled operations
*/
trait Cancellable {
/**
* Cancel the scheduled operation
* @return true if cancellation was successful, false if already completed
*/
def cancel(): Boolean
/**
* Check if operation has been cancelled
* @return true if cancelled
*/
def isCancelled: Boolean
}
/**
* Utility for creating pre-cancelled Cancellable
*/
object Cancellable {
/**
* Already cancelled Cancellable instance
*/
val cancelled: Cancellable
/**
* Create combined Cancellable that cancels all when cancelled
* @param cancellables - Cancellables to combine
* @return Combined Cancellable
*/
def apply(cancellables: Cancellable*): Cancellable
}Usage Examples:
class CancellableActor extends Actor {
import context.{dispatcher, system}
var scheduledTasks: List[Cancellable] = List.empty
def receive = {
case "start-tasks" =>
// Schedule multiple tasks
val task1 = system.scheduler.scheduleOnce(5.seconds, self, "task1")
val task2 = system.scheduler.schedule(1.second, 2.seconds, self, "task2")
val task3 = system.scheduler.scheduleOnce(10.seconds, self, "task3")
scheduledTasks = List(task1, task2, task3)
case "cancel-all" =>
// Cancel all scheduled tasks
scheduledTasks.foreach(_.cancel())
scheduledTasks = List.empty
sender() ! "All tasks cancelled"
case "check-status" =>
val statuses = scheduledTasks.map(_.isCancelled)
sender() ! s"Task cancellation status: $statuses"
case msg =>
println(s"Received scheduled message: $msg")
}
override def postStop(): Unit = {
// Clean up on actor stop
scheduledTasks.foreach(_.cancel())
}
}Actor-specific timer management with automatic cleanup.
/**
* Timer management for actors
*/
trait Timers { this: Actor =>
/**
* Timer scheduler instance
*/
def timers: TimerScheduler
}
/**
* Timer scheduler interface
*/
trait TimerScheduler {
/**
* Start single-shot timer
* @param key - Unique timer identifier
* @param msg - Message to send when timer fires
* @param delay - Delay before firing
*/
def startSingleTimer(key: Any, msg: Any, delay: Duration): Unit
/**
* Start timer with fixed delay between executions
* @param key - Unique timer identifier
* @param msg - Message to send repeatedly
* @param delay - Delay between executions
*/
def startTimerWithFixedDelay(key: Any, msg: Any, delay: Duration): Unit
/**
* Start timer with fixed rate (interval from start of previous execution)
* @param key - Unique timer identifier
* @param msg - Message to send repeatedly
* @param interval - Interval between execution starts
*/
def startTimerAtFixedRate(key: Any, msg: Any, interval: Duration): Unit
/**
* Cancel timer by key
* @param key - Timer identifier to cancel
*/
def cancel(key: Any): Unit
/**
* Cancel all timers
*/
def cancelAll(): Unit
/**
* Check if timer is active
* @param key - Timer identifier to check
* @return true if timer exists and is active
*/
def isTimerActive(key: Any): Boolean
}Usage Examples:
import akka.actor.{Actor, Timers}
import scala.concurrent.duration._
// Timer messages
case object Tick
case object HealthCheck
case object Cleanup
class TimerActor extends Actor with Timers {
override def preStart(): Unit = {
// Start timers on actor startup
timers.startSingleTimer("startup", "initialized", 1.second)
timers.startTimerWithFixedDelay("health", HealthCheck, 30.seconds)
}
def receive = {
case "initialized" =>
println("Actor initialized")
// Start periodic tick timer
timers.startTimerAtFixedRate("tick", Tick, 5.seconds)
case "start-cleanup" =>
// Start periodic cleanup with fixed delay
timers.startTimerWithFixedDelay("cleanup", Cleanup, 1.minute)
case "stop-cleanup" =>
timers.cancel("cleanup")
println("Cleanup timer stopped")
case Tick =>
println(s"Tick at ${System.currentTimeMillis()}")
case HealthCheck =>
println("Performing health check")
// Health check logic
case Cleanup =>
println("Performing cleanup")
// Cleanup logic
case "status" =>
val tickActive = timers.isTimerActive("tick")
val cleanupActive = timers.isTimerActive("cleanup")
sender() ! s"Tick: $tickActive, Cleanup: $cleanupActive"
case "stop-all" =>
timers.cancelAll()
println("All timers cancelled")
}
override def postStop(): Unit = {
// Timers are automatically cancelled when actor stops
println("Actor stopped, timers cleaned up")
}
}
// Usage with timer keys
class KeyedTimerActor extends Actor with Timers {
def receive = {
case ("start", key: String, delay: Duration) =>
timers.startSingleTimer(key, s"timer-$key", delay)
case ("stop", key: String) =>
timers.cancel(key)
case ("check", key: String) =>
val active = timers.isTimerActive(key)
sender() ! s"Timer $key active: $active"
case msg: String if msg.startsWith("timer-") =>
val key = msg.stripPrefix("timer-")
println(s"Timer $key fired")
}
}Common patterns for scheduling and timer usage.
/**
* Retry with exponential backoff
*/
def scheduleRetry[T](
operation: () => Future[T],
maxRetries: Int,
initialDelay: Duration,
backoffFactor: Double = 2.0
)(implicit system: ActorSystem): Future[T]
/**
* Schedule with jitter to avoid thundering herd
*/
def scheduleWithJitter(
baseDelay: Duration,
jitterFactor: Double = 0.1
): DurationUsage Examples:
import scala.util.Random
class RetryActor extends Actor with Timers {
import context.dispatcher
case class RetryableOperation(attempt: Int, maxAttempts: Int)
def receive = {
case "start-with-retry" =>
attemptOperation(1, 3)
case RetryableOperation(attempt, maxAttempts) =>
performOperation() match {
case Some(result) =>
sender() ! s"Success after $attempt attempts: $result"
case None if attempt < maxAttempts =>
// Exponential backoff with jitter
val baseDelay = (math.pow(2, attempt) * 1000).millis
val jitter = (Random.nextDouble() * 0.2 + 0.9) // 90-110% of base delay
val delay = (baseDelay.toMillis * jitter).millis
timers.startSingleTimer(
s"retry-$attempt",
RetryableOperation(attempt + 1, maxAttempts),
delay
)
case None =>
sender() ! s"Failed after $maxAttempts attempts"
}
}
private def attemptOperation(attempt: Int, maxAttempts: Int): Unit = {
self ! RetryableOperation(attempt, maxAttempts)
}
private def performOperation(): Option[String] = {
// Simulate operation that might fail
if (Random.nextDouble() < 0.3) Some("Operation result")
else None
}
}
// Batching with timers
class BatchingActor extends Actor with Timers {
import context.dispatcher
case object ProcessBatch
private var batch: List[String] = List.empty
private val batchSize = 10
private val batchTimeout = 5.seconds
def receive = {
case item: String =>
batch = item :: batch
if (batch.size == 1) {
// Start batch timeout on first item
timers.startSingleTimer("batch-timeout", ProcessBatch, batchTimeout)
}
if (batch.size >= batchSize) {
// Process full batch immediately
processBatch()
}
case ProcessBatch =>
if (batch.nonEmpty) {
processBatch()
}
}
private def processBatch(): Unit = {
timers.cancel("batch-timeout")
val currentBatch = batch.reverse
batch = List.empty
println(s"Processing batch of ${currentBatch.size} items: $currentBatch")
// Process batch logic here
}
}/**
* Light Array Revolving Scheduler - default implementation
*/
class LightArrayRevolverScheduler(
config: Config,
log: LoggingAdapter,
threadFactory: ThreadFactory
) extends Scheduler with Closeable
/**
* Hash wheel timer implementation
*/
class HashedWheelTimer(
tickDuration: Duration,
wheelSize: Int,
threadFactory: ThreadFactory
) extends Scheduler
/**
* Abstract base for custom schedulers
*/
abstract class AbstractScheduler extends Scheduler {
/**
* Maximum supported delay
*/
def maxFrequency: Double
}
/**
* Timer configuration
*/
trait TimerConfig {
def tickDuration: Duration
def wheelSize: Int
def tasksPerTick: Int
}
/**
* Scheduled task interface
*/
trait ScheduledTask {
def run(): Unit
def cancel(): Unit
def isCancelled: Boolean
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-actor-2-11