Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-persistence-2-13@2.8.0Akka Persistence provides comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model. It enables actors to persist their internal state through event logging, allowing them to recover from failures by replaying events. The library supports multiple persistence backends through pluggable journal and snapshot store implementations, and includes specialized persistence patterns like persistent FSMs and persistent views.
libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.8.8"import akka.persistence._
import akka.persistence.journal._
import akka.persistence.snapshot._
import akka.actor.ActorSystem
import scala.concurrent.FutureFor specific components:
import akka.persistence.{PersistentActor, AtLeastOnceDelivery}
import akka.persistence.fsm.PersistentFSM
import akka.persistence.state.scaladsl.DurableStateStore
import akka.persistence.journal.{AsyncWriteJournal, AsyncRecovery}
import akka.persistence.snapshot.SnapshotStore
import akka.Doneimport akka.persistence._
import akka.actor.{ActorSystem, Props}
// Basic persistent actor
class BankAccount extends PersistentActor {
override def persistenceId: String = "bank-account-1"
var balance: Double = 0.0
override def receiveRecover: Receive = {
case evt: TransactionEvent => updateState(evt)
case SnapshotOffer(_, snapshot: Double) => balance = snapshot
}
override def receiveCommand: Receive = {
case Deposit(amount) =>
persist(Deposited(amount)) { evt =>
updateState(evt)
sender() ! s"Deposited $amount, balance: $balance"
}
case Withdraw(amount) if balance >= amount =>
persist(Withdrawn(amount)) { evt =>
updateState(evt)
sender() ! s"Withdrawn $amount, balance: $balance"
}
case GetBalance => sender() ! balance
}
def updateState(event: TransactionEvent): Unit = event match {
case Deposited(amount) => balance += amount
case Withdrawn(amount) => balance -= amount
}
}
// Usage
implicit val system = ActorSystem("bank-system")
val bankAccount = system.actorOf(Props[BankAccount], "bank-account")
bankAccount ! Deposit(100.0)Akka Persistence is built around several key components:
Core persistent actor functionality for event sourcing patterns with automatic state recovery and configurable persistence backends.
trait PersistentActor extends Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery
def persist[A](event: A)(handler: A => Unit): Unit
def persistAll[A](events: immutable.Seq[A])(handler: A => Unit): Unit
def persistAsync[A](event: A)(handler: A => Unit): Unit
def persistAllAsync[A](events: immutable.Seq[A])(handler: A => Unit): UnitState snapshot functionality for optimizing recovery performance and managing large event histories.
trait Snapshotter extends Actor {
def saveSnapshot(snapshot: Any): Unit
def deleteSnapshot(sequenceNr: Long): Unit
def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit
}
case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
case class SnapshotSelectionCriteria(
maxSequenceNr: Long = Long.MaxValue,
maxTimestamp: Long = Long.MaxValue,
minSequenceNr: Long = 0L,
minTimestamp: Long = 0L
)Event transformation system for schema evolution and event format conversion between domain and journal representations.
trait EventAdapter extends WriteEventAdapter with ReadEventAdapter
trait WriteEventAdapter {
def manifest(event: Any): String
def toJournal(event: Any): Any
}
trait ReadEventAdapter {
def fromJournal(event: Any, manifest: String): EventSeq
}Reliable message delivery with automatic redelivery, confirmation tracking, and configurable retry policies.
trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {
def deliver(destination: ActorPath)(deliveryIdToMessage: Long => Any): Unit
def confirmDelivery(deliveryId: Long): Boolean
def numberOfUnconfirmed: Int
}Durable state storage for mutable state management with revision tracking and pluggable storage backends.
trait DurableStateStore[A] {
def getObject(persistenceId: String): Future[GetObjectResult[A]]
}
case class GetObjectResult[A](value: Option[A], revision: Long)Journal plugin development interfaces for implementing custom persistence backends with asynchronous write capabilities.
import scala.util.Try
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
}
trait AsyncRecovery {
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(recoveryCallback: PersistentRepr => Unit): Future[Unit]
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
}Comprehensive plugin development APIs for custom journal and snapshot store implementations with testing and deployment guides.
trait SnapshotStore extends Actor with ActorLogging {
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
}// Core persistence types
trait PersistentRepr extends Message {
/** The event payload */
def payload: Any
/** Event adapter manifest */
def manifest: String
/** Persistent actor ID */
def persistenceId: String
/** Sequence number */
def sequenceNr: Long
/** Storage timestamp */
def timestamp: Long
/** Optional metadata */
def metadata: Option[Any]
/** Writer unique identifier */
def writerUuid: String
/** Deletion flag (deprecated) */
def deleted: Boolean
/** Original sender (deprecated) */
def sender: ActorRef
/** Create new persistent repr with payload */
def withPayload(payload: Any): PersistentRepr
/** Create new persistent repr with manifest */
def withManifest(manifest: String): PersistentRepr
/** Create new persistent repr with timestamp */
def withTimestamp(newTimestamp: Long): PersistentRepr
/** Create new persistent repr with metadata */
def withMetadata(metadata: Any): PersistentRepr
/** Create updated copy */
def update(
sequenceNr: Long = sequenceNr,
persistenceId: String = persistenceId,
deleted: Boolean = deleted,
sender: ActorRef = sender,
writerUuid: String = writerUuid
): PersistentRepr
}
object PersistentRepr {
/** Plugin API: value of undefined persistenceId or manifest */
val Undefined = ""
/** Plugin API factory method */
def apply(
payload: Any,
sequenceNr: Long = 0L,
persistenceId: String = PersistentRepr.Undefined,
manifest: String = PersistentRepr.Undefined,
deleted: Boolean = false,
sender: ActorRef = null,
writerUuid: String = PersistentRepr.Undefined
): PersistentRepr
}
case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) {
/** Persistence ID from first message */
def persistenceId: String
/** Lowest sequence number in batch */
def lowestSequenceNr: Long
/** Highest sequence number in batch */
def highestSequenceNr: Long
/** Number of messages */
def size: Int
}
// Recovery configuration
case class Recovery(
fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
toSequenceNr: Long = Long.MaxValue,
replayMax: Long = Long.MaxValue
)
case object RecoveryCompleted
// Snapshot types
case class SnapshotMetadata(
persistenceId: String,
sequenceNr: Long,
timestamp: Long = 0L,
metadata: Option[Any] = None
)
// Journal response messages
case class DeleteMessagesSuccess(toSequenceNr: Long)
case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)
// Stash overflow strategies
sealed trait StashOverflowStrategy
case object DiscardToDeadLetterStrategy extends StashOverflowStrategy
case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy
case class ReplyToStrategy(response: Any) extends StashOverflowStrategy