or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

at-least-once-delivery.mddurable-state.mdevent-adapters.mdindex.mdjournal-api.mdpersistent-actors.mdplugin-development.mdsnapshots.md
tile.json

tessl/maven-com-typesafe-akka--akka-persistence-2-13

Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-persistence_2.13@2.8.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-persistence-2-13@2.8.0

index.mddocs/

Akka Persistence

Akka Persistence provides comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model. It enables actors to persist their internal state through event logging, allowing them to recover from failures by replaying events. The library supports multiple persistence backends through pluggable journal and snapshot store implementations, and includes specialized persistence patterns like persistent FSMs and persistent views.

Package Information

  • Package Name: com.typesafe.akka:akka-persistence_2.13
  • Package Type: maven
  • Language: Scala
  • Installation: libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.8.8"

Core Imports

import akka.persistence._
import akka.persistence.journal._
import akka.persistence.snapshot._
import akka.actor.ActorSystem
import scala.concurrent.Future

For specific components:

import akka.persistence.{PersistentActor, AtLeastOnceDelivery}
import akka.persistence.fsm.PersistentFSM
import akka.persistence.state.scaladsl.DurableStateStore
import akka.persistence.journal.{AsyncWriteJournal, AsyncRecovery}
import akka.persistence.snapshot.SnapshotStore
import akka.Done

Basic Usage

import akka.persistence._
import akka.actor.{ActorSystem, Props}

// Basic persistent actor
class BankAccount extends PersistentActor {
  override def persistenceId: String = "bank-account-1"
  
  var balance: Double = 0.0
  
  override def receiveRecover: Receive = {
    case evt: TransactionEvent => updateState(evt)
    case SnapshotOffer(_, snapshot: Double) => balance = snapshot
  }
  
  override def receiveCommand: Receive = {
    case Deposit(amount) =>
      persist(Deposited(amount)) { evt =>
        updateState(evt)
        sender() ! s"Deposited $amount, balance: $balance"
      }
    case Withdraw(amount) if balance >= amount =>
      persist(Withdrawn(amount)) { evt =>
        updateState(evt)
        sender() ! s"Withdrawn $amount, balance: $balance"
      }
    case GetBalance => sender() ! balance
  }
  
  def updateState(event: TransactionEvent): Unit = event match {
    case Deposited(amount) => balance += amount
    case Withdrawn(amount) => balance -= amount
  }
}

// Usage
implicit val system = ActorSystem("bank-system")
val bankAccount = system.actorOf(Props[BankAccount], "bank-account")
bankAccount ! Deposit(100.0)

Architecture

Akka Persistence is built around several key components:

  • Persistent Actors: Event-sourced actors that persist state changes as events and can recover from failures
  • Journal: Pluggable storage backend for events with support for multiple implementations (in-memory, file-based, database)
  • Snapshot Store: Optional storage for actor state snapshots to optimize recovery performance
  • Event Adapters: Transform events between domain and storage formats for schema evolution
  • Recovery System: Configurable recovery process that replays events and restores snapshots during actor initialization
  • At-Least-Once Delivery: Reliable message delivery with automatic redelivery and confirmation tracking

Capabilities

Persistent Actors

Core persistent actor functionality for event sourcing patterns with automatic state recovery and configurable persistence backends.

trait PersistentActor extends Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery

def persist[A](event: A)(handler: A => Unit): Unit
def persistAll[A](events: immutable.Seq[A])(handler: A => Unit): Unit
def persistAsync[A](event: A)(handler: A => Unit): Unit
def persistAllAsync[A](events: immutable.Seq[A])(handler: A => Unit): Unit

Persistent Actors

Snapshot Management

State snapshot functionality for optimizing recovery performance and managing large event histories.

trait Snapshotter extends Actor {
  def saveSnapshot(snapshot: Any): Unit
  def deleteSnapshot(sequenceNr: Long): Unit  
  def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit
}

case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
case class SnapshotSelectionCriteria(
  maxSequenceNr: Long = Long.MaxValue,
  maxTimestamp: Long = Long.MaxValue,
  minSequenceNr: Long = 0L,
  minTimestamp: Long = 0L
)

Snapshot Management

Event Adapters

Event transformation system for schema evolution and event format conversion between domain and journal representations.

trait EventAdapter extends WriteEventAdapter with ReadEventAdapter

trait WriteEventAdapter {
  def manifest(event: Any): String
  def toJournal(event: Any): Any
}

trait ReadEventAdapter {
  def fromJournal(event: Any, manifest: String): EventSeq
}

Event Adapters

At-Least-Once Delivery

Reliable message delivery with automatic redelivery, confirmation tracking, and configurable retry policies.

trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {
  def deliver(destination: ActorPath)(deliveryIdToMessage: Long => Any): Unit
  def confirmDelivery(deliveryId: Long): Boolean
  def numberOfUnconfirmed: Int
}

At-Least-Once Delivery

Durable State Management

Durable state storage for mutable state management with revision tracking and pluggable storage backends.

trait DurableStateStore[A] {
  def getObject(persistenceId: String): Future[GetObjectResult[A]]
}

case class GetObjectResult[A](value: Option[A], revision: Long)

Durable State

Journal API

Journal plugin development interfaces for implementing custom persistence backends with asynchronous write capabilities.

import scala.util.Try

trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
  def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
  def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
}

trait AsyncRecovery {
  def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(recoveryCallback: PersistentRepr => Unit): Future[Unit]
  def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
}

Journal API

Plugin Development

Comprehensive plugin development APIs for custom journal and snapshot store implementations with testing and deployment guides.

trait SnapshotStore extends Actor with ActorLogging {
  def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
  def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
  def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
  def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
}

Plugin Development

Common Types

// Core persistence types
trait PersistentRepr extends Message {
  /** The event payload */
  def payload: Any
  
  /** Event adapter manifest */
  def manifest: String
  
  /** Persistent actor ID */
  def persistenceId: String
  
  /** Sequence number */
  def sequenceNr: Long
  
  /** Storage timestamp */
  def timestamp: Long
  
  /** Optional metadata */
  def metadata: Option[Any]
  
  /** Writer unique identifier */
  def writerUuid: String
  
  /** Deletion flag (deprecated) */
  def deleted: Boolean
  
  /** Original sender (deprecated) */
  def sender: ActorRef
  
  /** Create new persistent repr with payload */
  def withPayload(payload: Any): PersistentRepr
  
  /** Create new persistent repr with manifest */
  def withManifest(manifest: String): PersistentRepr
  
  /** Create new persistent repr with timestamp */
  def withTimestamp(newTimestamp: Long): PersistentRepr
  
  /** Create new persistent repr with metadata */
  def withMetadata(metadata: Any): PersistentRepr
  
  /** Create updated copy */
  def update(
    sequenceNr: Long = sequenceNr,
    persistenceId: String = persistenceId,
    deleted: Boolean = deleted,
    sender: ActorRef = sender,
    writerUuid: String = writerUuid
  ): PersistentRepr
}

object PersistentRepr {
  /** Plugin API: value of undefined persistenceId or manifest */
  val Undefined = ""
  
  /** Plugin API factory method */
  def apply(
    payload: Any,
    sequenceNr: Long = 0L,
    persistenceId: String = PersistentRepr.Undefined,
    manifest: String = PersistentRepr.Undefined,
    deleted: Boolean = false,
    sender: ActorRef = null,
    writerUuid: String = PersistentRepr.Undefined
  ): PersistentRepr
}

case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) {
  /** Persistence ID from first message */
  def persistenceId: String
  
  /** Lowest sequence number in batch */
  def lowestSequenceNr: Long
  
  /** Highest sequence number in batch */
  def highestSequenceNr: Long
  
  /** Number of messages */
  def size: Int
}

// Recovery configuration
case class Recovery(
  fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
  toSequenceNr: Long = Long.MaxValue,
  replayMax: Long = Long.MaxValue
)

case object RecoveryCompleted

// Snapshot types
case class SnapshotMetadata(
  persistenceId: String,
  sequenceNr: Long,
  timestamp: Long = 0L,
  metadata: Option[Any] = None
)

// Journal response messages
case class DeleteMessagesSuccess(toSequenceNr: Long)
case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)

// Stash overflow strategies
sealed trait StashOverflowStrategy
case object DiscardToDeadLetterStrategy extends StashOverflowStrategy
case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy
case class ReplyToStrategy(response: Any) extends StashOverflowStrategy