CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-persistence-2-13

Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.

Pending
Overview
Eval results
Files

persistent-actors.mddocs/

Persistent Actors

Core persistent actor functionality for event sourcing patterns with automatic state recovery and configurable persistence backends.

Capabilities

PersistentActor Trait

The main trait for implementing persistent actors using event sourcing patterns.

/**
 * Scala API for persistent actors that can be used to implement command or Event Sourcing patterns
 */
trait PersistentActor extends Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery {
  
  /** Asynchronously persists an event with stashing */
  def persist[A](event: A)(handler: A => Unit): Unit
  
  /** Asynchronously persists multiple events atomically with stashing */
  def persistAll[A](events: immutable.Seq[A])(handler: A => Unit): Unit
  
  /** Asynchronously persists an event without stashing */
  def persistAsync[A](event: A)(handler: A => Unit): Unit
  
  /** Asynchronously persists multiple events without stashing */
  def persistAllAsync[A](events: immutable.Seq[A])(handler: A => Unit): Unit
  
  /** Defer handler execution with stashing */
  def defer[A](event: A)(handler: A => Unit): Unit
  
  /** Defer handler execution without stashing */
  def deferAsync[A](event: A)(handler: A => Unit): Unit
  
  /** Recovery handler - called during recovery for each replayed event */
  def receiveRecover: Receive
  
  /** Command handler - called for incoming messages */
  def receiveCommand: Receive
}

Usage Examples:

import akka.persistence._

class MyPersistentActor extends PersistentActor {
  override def persistenceId: String = "my-persistent-actor"
  
  var state: String = ""
  
  override def receiveRecover: Receive = {
    case evt: String => state = evt
    case SnapshotOffer(_, snapshot: String) => state = snapshot
  }
  
  override def receiveCommand: Receive = {
    case "cmd" =>
      persist("evt") { event =>
        state = event
        sender() ! "ok"
      }
    case "get" => sender() ! state
  }
}

AbstractPersistentActor (Java API)

Java API for persistent actors.

/**
 * Java API for persistent actors
 */
abstract class AbstractPersistentActor extends AbstractActor with Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery {
  
  /** Recovery handler - override to handle recovery events */
  def createReceiveRecover(): AbstractActor.Receive
  
  /** Command handler - override to handle commands */
  def createReceive(): AbstractActor.Receive
  
  /** Persist event with Java-style handler */
  def persist[A](event: A, handler: Procedure[A]): Unit
  
  /** Persist multiple events with Java-style handler */  
  def persistAll[A](events: java.util.List[A], handler: Procedure[A]): Unit
  
  /** Persist event asynchronously with Java-style handler */
  def persistAsync[A](event: A, handler: Procedure[A]): Unit
  
  /** Persist multiple events asynchronously with Java-style handler */
  def persistAllAsync[A](events: java.util.List[A], handler: Procedure[A]): Unit
}

AbstractPersistentActorWithTimers

Java API combining AbstractPersistentActor with timers functionality.

/**
 * Java API combining AbstractPersistentActor with timers functionality
 */
abstract class AbstractPersistentActorWithTimers extends AbstractPersistentActor with Timers

Core Supporting Traits

PersistenceIdentity

Identifies persistent actors with unique identifiers and plugin configurations.

trait PersistenceIdentity {
  /** Unique identifier for the persistent entity */
  def persistenceId: String
  
  /** Journal plugin configuration id (defaults to empty) */
  def journalPluginId: String = ""
  
  /** Snapshot plugin configuration id (defaults to empty) */
  def snapshotPluginId: String = ""
}

PersistenceRecovery

Defines recovery behavior and configuration.

trait PersistenceRecovery {
  /** Recovery configuration (defaults to Recovery()) */
  def recovery: Recovery = Recovery()
}

PersistenceStash

Stashing functionality for persistent actors with configurable overflow strategies.

trait PersistenceStash extends Stash {
  /** Strategy for handling stash overflow */
  def internalStashOverflowStrategy: StashOverflowStrategy = DiscardToDeadLetterStrategy
}

RuntimePluginConfig

Runtime configuration for persistence plugins.

trait RuntimePluginConfig {
  /** Additional journal plugin configuration */
  def journalPluginConfig: Config = ConfigFactory.empty()
  
  /** Additional snapshot plugin configuration */
  def snapshotPluginConfig: Config = ConfigFactory.empty()
}

Recovery Types

Recovery Configuration

Configures the recovery process for persistent actors.

/**
 * Recovery mode configuration
 */
case class Recovery(
  fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
  toSequenceNr: Long = Long.MaxValue,
  replayMax: Long = Long.MaxValue
)

object Recovery {
  /** Skip recovery configuration */
  val none: Recovery = Recovery(SnapshotSelectionCriteria.None, 0L, 0L)
  
  /** Java API factory methods */
  def create(): Recovery = Recovery()
  def create(fromSnapshot: SnapshotSelectionCriteria): Recovery = Recovery(fromSnapshot)
  def create(toSequenceNr: Long): Recovery = Recovery(toSequenceNr = toSequenceNr)
  def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long): Recovery = Recovery(fromSnapshot, toSequenceNr)
}

Recovery Messages

Messages sent during the recovery process.

/** Sent when journal replay is finished */
case object RecoveryCompleted {
  /** Java API */
  def getInstance: RecoveryCompleted.type = this
}

/** Exception thrown when recovery times out */
class RecoveryTimedOut(message: String) extends RuntimeException(message)

Stash Overflow Strategies

Base Strategy Trait

/** Base trait for stash overflow handling strategies */
sealed trait StashOverflowStrategy

Built-in Strategies

/** Discard messages to dead letters when stash overflows */
case object DiscardToDeadLetterStrategy extends StashOverflowStrategy {
  /** Java API */  
  def getInstance: DiscardToDeadLetterStrategy.type = this
}

/** Throw exception when stash overflows */
case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy {
  /** Java API */
  def getInstance: ThrowOverflowExceptionStrategy.type = this  
}

/** Reply with predefined response and discard message */
case class ReplyToStrategy(response: Any) extends StashOverflowStrategy

Strategy Configurator

/** Interface for configuring stash overflow strategies */
trait StashOverflowStrategyConfigurator {
  def create(config: Config): StashOverflowStrategy
}

Internal Actor State Access

The Eventsourced trait provides access to internal persistent actor state.

trait Eventsourced extends Actor {
  /** Highest sequence number received */
  def lastSequenceNr: Long
  
  /** Current snapshot sequence number */  
  def snapshotSequenceNr: Long
  
  /** Whether recovery is in progress */
  def recoveryRunning: Boolean
  
  /** Whether recovery has completed */
  def recoveryFinished: Boolean
  
  /** Delete persistent messages up to sequence number */
  def deleteMessages(toSequenceNr: Long): Unit
}

Example: Complete Event Sourced Actor

import akka.persistence._
import akka.actor.{ActorRef, Props}

// Events
sealed trait CounterEvent
case object Incremented extends CounterEvent
case object Decremented extends CounterEvent

// Commands  
sealed trait CounterCommand
case object Increment extends CounterCommand
case object Decrement extends CounterCommand
case object GetValue extends CounterCommand

class Counter extends PersistentActor {
  override def persistenceId: String = "counter-1"
  
  private var value = 0
  
  override def receiveRecover: Receive = {
    case Incremented => value += 1
    case Decremented => value -= 1
    case SnapshotOffer(_, snapshot: Int) => value = snapshot
  }
  
  override def receiveCommand: Receive = {
    case Increment =>
      persist(Incremented) { _ =>
        value += 1
        sender() ! value
      }
    case Decrement =>
      persist(Decremented) { _ => 
        value -= 1
        sender() ! value
      }
    case GetValue => sender() ! value
    
    // Take snapshot every 10 events  
    case "snap" if lastSequenceNr % 10 == 0 => saveSnapshot(value)
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13

docs

at-least-once-delivery.md

durable-state.md

event-adapters.md

index.md

journal-api.md

persistent-actors.md

plugin-development.md

snapshots.md

tile.json