Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.
—
Core persistent actor functionality for event sourcing patterns with automatic state recovery and configurable persistence backends.
The main trait for implementing persistent actors using event sourcing patterns.
/**
* Scala API for persistent actors that can be used to implement command or Event Sourcing patterns
*/
trait PersistentActor extends Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery {
/** Asynchronously persists an event with stashing */
def persist[A](event: A)(handler: A => Unit): Unit
/** Asynchronously persists multiple events atomically with stashing */
def persistAll[A](events: immutable.Seq[A])(handler: A => Unit): Unit
/** Asynchronously persists an event without stashing */
def persistAsync[A](event: A)(handler: A => Unit): Unit
/** Asynchronously persists multiple events without stashing */
def persistAllAsync[A](events: immutable.Seq[A])(handler: A => Unit): Unit
/** Defer handler execution with stashing */
def defer[A](event: A)(handler: A => Unit): Unit
/** Defer handler execution without stashing */
def deferAsync[A](event: A)(handler: A => Unit): Unit
/** Recovery handler - called during recovery for each replayed event */
def receiveRecover: Receive
/** Command handler - called for incoming messages */
def receiveCommand: Receive
}Usage Examples:
import akka.persistence._
class MyPersistentActor extends PersistentActor {
override def persistenceId: String = "my-persistent-actor"
var state: String = ""
override def receiveRecover: Receive = {
case evt: String => state = evt
case SnapshotOffer(_, snapshot: String) => state = snapshot
}
override def receiveCommand: Receive = {
case "cmd" =>
persist("evt") { event =>
state = event
sender() ! "ok"
}
case "get" => sender() ! state
}
}Java API for persistent actors.
/**
* Java API for persistent actors
*/
abstract class AbstractPersistentActor extends AbstractActor with Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery {
/** Recovery handler - override to handle recovery events */
def createReceiveRecover(): AbstractActor.Receive
/** Command handler - override to handle commands */
def createReceive(): AbstractActor.Receive
/** Persist event with Java-style handler */
def persist[A](event: A, handler: Procedure[A]): Unit
/** Persist multiple events with Java-style handler */
def persistAll[A](events: java.util.List[A], handler: Procedure[A]): Unit
/** Persist event asynchronously with Java-style handler */
def persistAsync[A](event: A, handler: Procedure[A]): Unit
/** Persist multiple events asynchronously with Java-style handler */
def persistAllAsync[A](events: java.util.List[A], handler: Procedure[A]): Unit
}Java API combining AbstractPersistentActor with timers functionality.
/**
* Java API combining AbstractPersistentActor with timers functionality
*/
abstract class AbstractPersistentActorWithTimers extends AbstractPersistentActor with TimersIdentifies persistent actors with unique identifiers and plugin configurations.
trait PersistenceIdentity {
/** Unique identifier for the persistent entity */
def persistenceId: String
/** Journal plugin configuration id (defaults to empty) */
def journalPluginId: String = ""
/** Snapshot plugin configuration id (defaults to empty) */
def snapshotPluginId: String = ""
}Defines recovery behavior and configuration.
trait PersistenceRecovery {
/** Recovery configuration (defaults to Recovery()) */
def recovery: Recovery = Recovery()
}Stashing functionality for persistent actors with configurable overflow strategies.
trait PersistenceStash extends Stash {
/** Strategy for handling stash overflow */
def internalStashOverflowStrategy: StashOverflowStrategy = DiscardToDeadLetterStrategy
}Runtime configuration for persistence plugins.
trait RuntimePluginConfig {
/** Additional journal plugin configuration */
def journalPluginConfig: Config = ConfigFactory.empty()
/** Additional snapshot plugin configuration */
def snapshotPluginConfig: Config = ConfigFactory.empty()
}Configures the recovery process for persistent actors.
/**
* Recovery mode configuration
*/
case class Recovery(
fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
toSequenceNr: Long = Long.MaxValue,
replayMax: Long = Long.MaxValue
)
object Recovery {
/** Skip recovery configuration */
val none: Recovery = Recovery(SnapshotSelectionCriteria.None, 0L, 0L)
/** Java API factory methods */
def create(): Recovery = Recovery()
def create(fromSnapshot: SnapshotSelectionCriteria): Recovery = Recovery(fromSnapshot)
def create(toSequenceNr: Long): Recovery = Recovery(toSequenceNr = toSequenceNr)
def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long): Recovery = Recovery(fromSnapshot, toSequenceNr)
}Messages sent during the recovery process.
/** Sent when journal replay is finished */
case object RecoveryCompleted {
/** Java API */
def getInstance: RecoveryCompleted.type = this
}
/** Exception thrown when recovery times out */
class RecoveryTimedOut(message: String) extends RuntimeException(message)/** Base trait for stash overflow handling strategies */
sealed trait StashOverflowStrategy/** Discard messages to dead letters when stash overflows */
case object DiscardToDeadLetterStrategy extends StashOverflowStrategy {
/** Java API */
def getInstance: DiscardToDeadLetterStrategy.type = this
}
/** Throw exception when stash overflows */
case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy {
/** Java API */
def getInstance: ThrowOverflowExceptionStrategy.type = this
}
/** Reply with predefined response and discard message */
case class ReplyToStrategy(response: Any) extends StashOverflowStrategy/** Interface for configuring stash overflow strategies */
trait StashOverflowStrategyConfigurator {
def create(config: Config): StashOverflowStrategy
}The Eventsourced trait provides access to internal persistent actor state.
trait Eventsourced extends Actor {
/** Highest sequence number received */
def lastSequenceNr: Long
/** Current snapshot sequence number */
def snapshotSequenceNr: Long
/** Whether recovery is in progress */
def recoveryRunning: Boolean
/** Whether recovery has completed */
def recoveryFinished: Boolean
/** Delete persistent messages up to sequence number */
def deleteMessages(toSequenceNr: Long): Unit
}import akka.persistence._
import akka.actor.{ActorRef, Props}
// Events
sealed trait CounterEvent
case object Incremented extends CounterEvent
case object Decremented extends CounterEvent
// Commands
sealed trait CounterCommand
case object Increment extends CounterCommand
case object Decrement extends CounterCommand
case object GetValue extends CounterCommand
class Counter extends PersistentActor {
override def persistenceId: String = "counter-1"
private var value = 0
override def receiveRecover: Receive = {
case Incremented => value += 1
case Decremented => value -= 1
case SnapshotOffer(_, snapshot: Int) => value = snapshot
}
override def receiveCommand: Receive = {
case Increment =>
persist(Incremented) { _ =>
value += 1
sender() ! value
}
case Decrement =>
persist(Decremented) { _ =>
value -= 1
sender() ! value
}
case GetValue => sender() ! value
// Take snapshot every 10 events
case "snap" if lastSequenceNr % 10 == 0 => saveSnapshot(value)
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13