CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-persistence-2-13

Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.

Pending
Overview
Eval results
Files

snapshots.mddocs/

Snapshot Management

State snapshot functionality for optimizing recovery performance and managing large event histories.

Capabilities

Snapshotter Trait

Core snapshot functionality for persistent actors.

/**
 * Snapshot API for persistent actors
 */
trait Snapshotter extends Actor {
  /** Snapshotter identifier */
  def snapshotterId: String
  
  /** Current snapshot sequence number */
  def snapshotSequenceNr: Long
  
  /** Save a snapshot of current state */
  def saveSnapshot(snapshot: Any): Unit
  
  /** Delete specific snapshot by sequence number */
  def deleteSnapshot(sequenceNr: Long): Unit
  
  /** Delete snapshots matching criteria */
  def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit
  
  /** Load snapshot for given persistence ID and criteria */
  def loadSnapshot(
    persistenceId: String, 
    criteria: SnapshotSelectionCriteria, 
    toSequenceNr: Long
  ): Unit
}

Snapshot Metadata

Metadata associated with snapshots.

/**
 * Snapshot metadata containing identification and timing information
 */
final class SnapshotMetadata(
  val persistenceId: String,
  val sequenceNr: Long,
  val timestamp: Long,
  val metadata: Option[Any]
) {
  /** Alternative constructor without metadata */
  def this(persistenceId: String, sequenceNr: Long, timestamp: Long) = 
    this(persistenceId, sequenceNr, timestamp, None)
  
  /** Create copy with additional metadata */
  def withMetadata(metadata: Any): SnapshotMetadata = 
    new SnapshotMetadata(persistenceId, sequenceNr, timestamp, Some(metadata))
  
  /** Create copy with modified fields (for binary compatibility) */
  def copy(
    persistenceId: String = this.persistenceId,
    sequenceNr: Long = this.sequenceNr,
    timestamp: Long = this.timestamp
  ): SnapshotMetadata = 
    SnapshotMetadata(persistenceId, sequenceNr, timestamp, metadata)
}

object SnapshotMetadata {
  /** Factory method */
  def apply(
    persistenceId: String,
    sequenceNr: Long,
    timestamp: Long,
    metadata: Option[Any]
  ): SnapshotMetadata = new SnapshotMetadata(persistenceId, sequenceNr, timestamp, metadata)
}

Snapshot Selection Criteria

Criteria for selecting which snapshots to load or delete.

/**
 * Criteria for selecting snapshots during recovery or deletion
 */
case class SnapshotSelectionCriteria(
  maxSequenceNr: Long = Long.MaxValue,
  maxTimestamp: Long = Long.MaxValue,
  minSequenceNr: Long = 0L,
  minTimestamp: Long = 0L
)

object SnapshotSelectionCriteria {
  /** Select the latest available snapshot */
  val Latest: SnapshotSelectionCriteria = SnapshotSelectionCriteria()
  
  /** Do not select any snapshot */
  val None: SnapshotSelectionCriteria = SnapshotSelectionCriteria(0L, 0L)
}

Usage Examples:

// Select latest snapshot
val latestCriteria = SnapshotSelectionCriteria.Latest

// Select snapshots up to sequence number 1000
val upTo1000 = SnapshotSelectionCriteria(maxSequenceNr = 1000L)

// Select snapshots from specific time range
val timeRange = SnapshotSelectionCriteria(
  maxTimestamp = System.currentTimeMillis(),
  minTimestamp = System.currentTimeMillis() - 86400000L // 24 hours ago
)

// Select specific sequence number range
val seqRange = SnapshotSelectionCriteria(
  maxSequenceNr = 1000L,
  minSequenceNr = 500L
)

Snapshot Recovery

Messages used during snapshot recovery process.

/**
 * Snapshot offer sent during recovery
 */
case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)

/**
 * Plugin API selected snapshot representation
 */
case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)

Snapshot Response Messages

Success Responses

/**
 * Successful snapshot save confirmation
 */
case class SaveSnapshotSuccess(metadata: SnapshotMetadata)

/**
 * Successful snapshot deletion confirmation  
 */
case class DeleteSnapshotSuccess(metadata: SnapshotMetadata)

/**
 * Successful deletion of snapshot range
 */
case class DeleteSnapshotsSuccess(criteria: SnapshotSelectionCriteria)

Failure Responses

/**
 * Failed snapshot save notification
 */
case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)

/**
 * Failed snapshot deletion notification
 */
case class DeleteSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)

/**
 * Failed deletion of snapshot range
 */
case class DeleteSnapshotsFailure(criteria: SnapshotSelectionCriteria, cause: Throwable)

Snapshot Store Plugin API

Base class for implementing snapshot store plugins.

/**
 * Abstract snapshot store base class for plugin implementations
 */
trait SnapshotStore extends Actor with ActorLogging {
  
  /** Load snapshot matching criteria */
  protected def loadAsync(
    persistenceId: String,
    criteria: SnapshotSelectionCriteria
  ): Future[Option[SelectedSnapshot]]
  
  /** Save snapshot */
  protected def saveAsync(
    metadata: SnapshotMetadata,
    snapshot: Any
  ): Future[Unit]
  
  /** Delete snapshot by metadata */
  protected def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
  
  /** Delete snapshots matching criteria */
  protected def deleteAsync(
    persistenceId: String,
    criteria: SnapshotSelectionCriteria
  ): Future[Unit]
}

Example: Snapshot Usage in Persistent Actor

import akka.persistence._

case class CounterState(value: Int, lastUpdate: Long)

class SnapshotAwareCounter extends PersistentActor {
  override def persistenceId: String = "snapshot-counter"
  
  private var state = CounterState(0, System.currentTimeMillis())
  
  override def receiveRecover: Receive = {
    case Incremented => 
      state = state.copy(value = state.value + 1, lastUpdate = System.currentTimeMillis())
    case Decremented => 
      state = state.copy(value = state.value - 1, lastUpdate = System.currentTimeMillis())
    case SnapshotOffer(metadata, snapshot: CounterState) =>
      println(s"Recovered from snapshot at sequence ${metadata.sequenceNr}")
      state = snapshot
  }
  
  override def receiveCommand: Receive = {
    case Increment =>
      persist(Incremented) { _ =>
        state = state.copy(value = state.value + 1, lastUpdate = System.currentTimeMillis())
        sender() ! state.value
        
        // Save snapshot every 100 events
        if (lastSequenceNr % 100 == 0) {
          saveSnapshot(state)
        }
      }
      
    case Decrement =>
      persist(Decremented) { _ =>
        state = state.copy(value = state.value - 1, lastUpdate = System.currentTimeMillis())
        sender() ! state.value
        
        if (lastSequenceNr % 100 == 0) {
          saveSnapshot(state)
        }
      }
      
    case GetValue => sender() ! state.value
    
    case "force-snapshot" => saveSnapshot(state)
    
    case "delete-old-snapshots" =>
      // Delete snapshots older than current - 200 sequence numbers
      val criteria = SnapshotSelectionCriteria(maxSequenceNr = lastSequenceNr - 200)
      deleteSnapshots(criteria)
      
    case SaveSnapshotSuccess(metadata) =>
      println(s"Snapshot saved successfully at sequence ${metadata.sequenceNr}")
      
    case SaveSnapshotFailure(metadata, cause) =>
      println(s"Snapshot save failed at sequence ${metadata.sequenceNr}: ${cause.getMessage}")
      
    case DeleteSnapshotsSuccess(criteria) =>
      println(s"Old snapshots deleted successfully up to ${criteria.maxSequenceNr}")
      
    case DeleteSnapshotsFailure(criteria, cause) =>
      println(s"Failed to delete snapshots: ${cause.getMessage}")
  }
}

Advanced Snapshot Patterns

Conditional Snapshotting

class ConditionalSnapshotActor extends PersistentActor {
  override def persistenceId: String = "conditional-snapshot"
  
  private var eventsSinceSnapshot = 0
  private var state = SomeComplexState()
  
  override def receiveCommand: Receive = {
    case SomeEvent =>
      persist(SomeEvent) { _ =>
        updateState()
        eventsSinceSnapshot += 1
        
        // Snapshot based on multiple conditions
        val shouldSnapshot = eventsSinceSnapshot >= 50 || 
                           state.size > 1000 ||
                           (System.currentTimeMillis() - lastUpdate) > 3600000 // 1 hour
                           
        if (shouldSnapshot) {
          saveSnapshot(state)
          eventsSinceSnapshot = 0
        }
      }
      
    case SaveSnapshotSuccess(_) =>
      eventsSinceSnapshot = 0
  }
}

Snapshot Metadata Usage

class MetadataAwareActor extends PersistentActor {
  override def persistenceId: String = "metadata-aware"
  
  override def receiveCommand: Receive = {
    case "snapshot-with-metadata" =>
      val metadata = Map("reason" -> "manual", "timestamp" -> System.currentTimeMillis())
      // Note: metadata is added to SnapshotMetadata after saving
      saveSnapshot(currentState)
      
    case SaveSnapshotSuccess(metadata) =>
      metadata.metadata.foreach { meta =>
        println(s"Snapshot saved with metadata: $meta")
      }
  }
  
  override def receiveRecover: Receive = {
    case SnapshotOffer(metadata, snapshot) =>
      println(s"Recovering from snapshot ${metadata.sequenceNr} at ${metadata.timestamp}")
      metadata.metadata.foreach { meta =>
        println(s"Snapshot metadata: $meta")  
      }
      restoreFromSnapshot(snapshot)
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13

docs

at-least-once-delivery.md

durable-state.md

event-adapters.md

index.md

journal-api.md

persistent-actors.md

plugin-development.md

snapshots.md

tile.json