Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.
—
State snapshot functionality for optimizing recovery performance and managing large event histories.
Core snapshot functionality for persistent actors.
/**
* Snapshot API for persistent actors
*/
trait Snapshotter extends Actor {
/** Snapshotter identifier */
def snapshotterId: String
/** Current snapshot sequence number */
def snapshotSequenceNr: Long
/** Save a snapshot of current state */
def saveSnapshot(snapshot: Any): Unit
/** Delete specific snapshot by sequence number */
def deleteSnapshot(sequenceNr: Long): Unit
/** Delete snapshots matching criteria */
def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit
/** Load snapshot for given persistence ID and criteria */
def loadSnapshot(
persistenceId: String,
criteria: SnapshotSelectionCriteria,
toSequenceNr: Long
): Unit
}Metadata associated with snapshots.
/**
* Snapshot metadata containing identification and timing information
*/
final class SnapshotMetadata(
val persistenceId: String,
val sequenceNr: Long,
val timestamp: Long,
val metadata: Option[Any]
) {
/** Alternative constructor without metadata */
def this(persistenceId: String, sequenceNr: Long, timestamp: Long) =
this(persistenceId, sequenceNr, timestamp, None)
/** Create copy with additional metadata */
def withMetadata(metadata: Any): SnapshotMetadata =
new SnapshotMetadata(persistenceId, sequenceNr, timestamp, Some(metadata))
/** Create copy with modified fields (for binary compatibility) */
def copy(
persistenceId: String = this.persistenceId,
sequenceNr: Long = this.sequenceNr,
timestamp: Long = this.timestamp
): SnapshotMetadata =
SnapshotMetadata(persistenceId, sequenceNr, timestamp, metadata)
}
object SnapshotMetadata {
/** Factory method */
def apply(
persistenceId: String,
sequenceNr: Long,
timestamp: Long,
metadata: Option[Any]
): SnapshotMetadata = new SnapshotMetadata(persistenceId, sequenceNr, timestamp, metadata)
}Criteria for selecting which snapshots to load or delete.
/**
* Criteria for selecting snapshots during recovery or deletion
*/
case class SnapshotSelectionCriteria(
maxSequenceNr: Long = Long.MaxValue,
maxTimestamp: Long = Long.MaxValue,
minSequenceNr: Long = 0L,
minTimestamp: Long = 0L
)
object SnapshotSelectionCriteria {
/** Select the latest available snapshot */
val Latest: SnapshotSelectionCriteria = SnapshotSelectionCriteria()
/** Do not select any snapshot */
val None: SnapshotSelectionCriteria = SnapshotSelectionCriteria(0L, 0L)
}Usage Examples:
// Select latest snapshot
val latestCriteria = SnapshotSelectionCriteria.Latest
// Select snapshots up to sequence number 1000
val upTo1000 = SnapshotSelectionCriteria(maxSequenceNr = 1000L)
// Select snapshots from specific time range
val timeRange = SnapshotSelectionCriteria(
maxTimestamp = System.currentTimeMillis(),
minTimestamp = System.currentTimeMillis() - 86400000L // 24 hours ago
)
// Select specific sequence number range
val seqRange = SnapshotSelectionCriteria(
maxSequenceNr = 1000L,
minSequenceNr = 500L
)Messages used during snapshot recovery process.
/**
* Snapshot offer sent during recovery
*/
case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
/**
* Plugin API selected snapshot representation
*/
case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)/**
* Successful snapshot save confirmation
*/
case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
/**
* Successful snapshot deletion confirmation
*/
case class DeleteSnapshotSuccess(metadata: SnapshotMetadata)
/**
* Successful deletion of snapshot range
*/
case class DeleteSnapshotsSuccess(criteria: SnapshotSelectionCriteria)/**
* Failed snapshot save notification
*/
case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
/**
* Failed snapshot deletion notification
*/
case class DeleteSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
/**
* Failed deletion of snapshot range
*/
case class DeleteSnapshotsFailure(criteria: SnapshotSelectionCriteria, cause: Throwable)Base class for implementing snapshot store plugins.
/**
* Abstract snapshot store base class for plugin implementations
*/
trait SnapshotStore extends Actor with ActorLogging {
/** Load snapshot matching criteria */
protected def loadAsync(
persistenceId: String,
criteria: SnapshotSelectionCriteria
): Future[Option[SelectedSnapshot]]
/** Save snapshot */
protected def saveAsync(
metadata: SnapshotMetadata,
snapshot: Any
): Future[Unit]
/** Delete snapshot by metadata */
protected def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
/** Delete snapshots matching criteria */
protected def deleteAsync(
persistenceId: String,
criteria: SnapshotSelectionCriteria
): Future[Unit]
}import akka.persistence._
case class CounterState(value: Int, lastUpdate: Long)
class SnapshotAwareCounter extends PersistentActor {
override def persistenceId: String = "snapshot-counter"
private var state = CounterState(0, System.currentTimeMillis())
override def receiveRecover: Receive = {
case Incremented =>
state = state.copy(value = state.value + 1, lastUpdate = System.currentTimeMillis())
case Decremented =>
state = state.copy(value = state.value - 1, lastUpdate = System.currentTimeMillis())
case SnapshotOffer(metadata, snapshot: CounterState) =>
println(s"Recovered from snapshot at sequence ${metadata.sequenceNr}")
state = snapshot
}
override def receiveCommand: Receive = {
case Increment =>
persist(Incremented) { _ =>
state = state.copy(value = state.value + 1, lastUpdate = System.currentTimeMillis())
sender() ! state.value
// Save snapshot every 100 events
if (lastSequenceNr % 100 == 0) {
saveSnapshot(state)
}
}
case Decrement =>
persist(Decremented) { _ =>
state = state.copy(value = state.value - 1, lastUpdate = System.currentTimeMillis())
sender() ! state.value
if (lastSequenceNr % 100 == 0) {
saveSnapshot(state)
}
}
case GetValue => sender() ! state.value
case "force-snapshot" => saveSnapshot(state)
case "delete-old-snapshots" =>
// Delete snapshots older than current - 200 sequence numbers
val criteria = SnapshotSelectionCriteria(maxSequenceNr = lastSequenceNr - 200)
deleteSnapshots(criteria)
case SaveSnapshotSuccess(metadata) =>
println(s"Snapshot saved successfully at sequence ${metadata.sequenceNr}")
case SaveSnapshotFailure(metadata, cause) =>
println(s"Snapshot save failed at sequence ${metadata.sequenceNr}: ${cause.getMessage}")
case DeleteSnapshotsSuccess(criteria) =>
println(s"Old snapshots deleted successfully up to ${criteria.maxSequenceNr}")
case DeleteSnapshotsFailure(criteria, cause) =>
println(s"Failed to delete snapshots: ${cause.getMessage}")
}
}class ConditionalSnapshotActor extends PersistentActor {
override def persistenceId: String = "conditional-snapshot"
private var eventsSinceSnapshot = 0
private var state = SomeComplexState()
override def receiveCommand: Receive = {
case SomeEvent =>
persist(SomeEvent) { _ =>
updateState()
eventsSinceSnapshot += 1
// Snapshot based on multiple conditions
val shouldSnapshot = eventsSinceSnapshot >= 50 ||
state.size > 1000 ||
(System.currentTimeMillis() - lastUpdate) > 3600000 // 1 hour
if (shouldSnapshot) {
saveSnapshot(state)
eventsSinceSnapshot = 0
}
}
case SaveSnapshotSuccess(_) =>
eventsSinceSnapshot = 0
}
}class MetadataAwareActor extends PersistentActor {
override def persistenceId: String = "metadata-aware"
override def receiveCommand: Receive = {
case "snapshot-with-metadata" =>
val metadata = Map("reason" -> "manual", "timestamp" -> System.currentTimeMillis())
// Note: metadata is added to SnapshotMetadata after saving
saveSnapshot(currentState)
case SaveSnapshotSuccess(metadata) =>
metadata.metadata.foreach { meta =>
println(s"Snapshot saved with metadata: $meta")
}
}
override def receiveRecover: Receive = {
case SnapshotOffer(metadata, snapshot) =>
println(s"Recovering from snapshot ${metadata.sequenceNr} at ${metadata.timestamp}")
metadata.metadata.foreach { meta =>
println(s"Snapshot metadata: $meta")
}
restoreFromSnapshot(snapshot)
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13