Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.
—
Durable state storage for mutable state management with revision tracking and pluggable storage backends.
API for reading durable state objects with type safety.
/**
* API for reading durable state objects
*/
trait DurableStateStore[A] {
/** Retrieve state object by persistence ID */
def getObject(persistenceId: String): Future[GetObjectResult[A]]
}Result container for durable state retrieval operations.
/**
* Result of durable state retrieval containing value and revision
*/
case class GetObjectResult[A](
value: Option[A],
revision: Long
) {
/** Convert to Java API result */
def toJava: JGetObjectResult[A] = JGetObjectResult(value.asJava, revision)
}Usage Examples:
import akka.persistence.state.scaladsl.DurableStateStore
import scala.concurrent.Future
// Basic state retrieval
val stateStore: DurableStateStore[UserProfile] = // ... obtain store
val userStateFuture: Future[GetObjectResult[UserProfile]] =
stateStore.getObject("user-123")
userStateFuture.foreach { result =>
result.value match {
case Some(profile) =>
println(s"User profile at revision ${result.revision}: $profile")
case None =>
println("No state found for user-123")
}
}Extended API for reading and updating durable state objects.
/**
* API for reading and updating durable state objects
*/
trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
/**
* Upsert state object with optimistic concurrency control
* @param persistenceId Unique identifier for the state
* @param revision Expected current revision (starts at 1)
* @param value New state value
* @param tag Optional tag for the update
* @return Future completed when operation finishes
*/
def upsertObject(
persistenceId: String,
revision: Long,
value: A,
tag: String
): Future[Done]
/**
* Delete state object
* @param persistenceId Unique identifier for the state
* @param revision Expected current revision
* @return Future completed when operation finishes
*/
def deleteObject(
persistenceId: String,
revision: Long
): Future[Done]
}/**
* Java API for reading durable state objects
*/
trait DurableStateStore[A] {
/** Retrieve state object by persistence ID */
def getObject(persistenceId: String): CompletionStage[JGetObjectResult[A]]
}/**
* Java API result of durable state retrieval
*/
case class JGetObjectResult[A](
value: Optional[A],
revision: Long
)/**
* Java API for reading and updating durable state objects
*/
trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
/** Upsert state object with optimistic concurrency control */
def upsertObject(
persistenceId: String,
revision: Long,
value: A,
tag: String
): CompletionStage[Done]
/** Delete state object */
def deleteObject(
persistenceId: String,
revision: Long
): CompletionStage[Done]
}Registry for obtaining durable state store instances.
/**
* Registry for obtaining configured durable state store instances
*/
object DurableStateStoreRegistry extends ExtensionId[DurableStateStoreRegistry] {
/** Get durable state store for the specified plugin ID */
def durableStateStoreFor[A](
system: ActorSystem,
pluginId: String
): DurableStateStore[A]
}
class DurableStateStoreRegistry(system: ExtendedActorSystem) extends Extension {
/** Get durable state store by plugin ID */
def durableStateStoreFor[A](pluginId: String): DurableStateStore[A]
}Provider interface for durable state store plugins.
/**
* Provider interface for durable state store plugin implementations
*/
trait DurableStateStoreProvider {
/** Create durable state store instance */
def scaladslDurableStateStore(): DurableStateStore[Any]
/** Create Java API durable state store instance */
def javadslDurableStateStore(): JDurableStateStore[AnyRef]
}import akka.persistence.state.scaladsl.{DurableStateStore, DurableStateUpdateStore}
import akka.actor.ActorSystem
import akka.Done
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Failure}
// Domain model
case class UserProfile(
userId: String,
name: String,
email: String,
preferences: Map[String, Any],
lastModified: Long
)
class UserProfileService(
stateStore: DurableStateUpdateStore[UserProfile]
)(implicit ec: ExecutionContext) {
def getProfile(userId: String): Future[Option[UserProfile]] = {
stateStore.getObject(s"user-profile-$userId").map(_.value)
}
def createProfile(profile: UserProfile): Future[Done] = {
val profileWithTimestamp = profile.copy(lastModified = System.currentTimeMillis())
stateStore.upsertObject(
persistenceId = s"user-profile-${profile.userId}",
revision = 0L, // New profile
value = profileWithTimestamp,
tag = ""
)
}
def updateProfile(userId: String, updater: UserProfile => UserProfile): Future[Option[Done]] = {
for {
current <- stateStore.getObject(s"user-profile-$userId")
result <- current.value match {
case Some(profile) =>
val updated = updater(profile).copy(lastModified = System.currentTimeMillis())
stateStore.upsertObject(
persistenceId = s"user-profile-$userId",
revision = current.revision,
value = updated,
tag = ""
).map(result => Some(result))
case None =>
Future.successful(None)
}
} yield result
}
def deleteProfile(userId: String): Future[Boolean] = {
for {
current <- stateStore.getObject(s"user-profile-$userId")
result <- current.value match {
case Some(_) =>
stateStore.deleteObject(
persistenceId = s"user-profile-$userId",
revision = current.revision
).map(_ => true)
case None =>
Future.successful(false)
}
} yield result
}
}
// Usage example
implicit val system: ActorSystem = ActorSystem("user-service")
implicit val ec: ExecutionContext = system.dispatcher
val stateStore: DurableStateUpdateStore[UserProfile] =
DurableStateStoreRegistry(system).durableStateStoreFor("my-state-store")
val userService = new UserProfileService(stateStore)
// Create new profile
val newProfile = UserProfile(
userId = "user123",
name = "John Doe",
email = "john@example.com",
preferences = Map("theme" -> "dark", "notifications" -> true),
lastModified = 0L
)
userService.createProfile(newProfile).onComplete {
case Success(_) => println("Profile created successfully")
case Failure(ex) => println(s"Failed to create profile: ${ex.getMessage}")
}
// Update profile
userService.updateProfile("user123", profile =>
profile.copy(preferences = profile.preferences + ("language" -> "en"))
).onComplete {
case Success(Some(_)) => println("Profile updated successfully")
case Success(None) => println("Profile not found")
case Failure(ex) => println(s"Failed to update profile: ${ex.getMessage}")
}import akka.persistence.state.scaladsl.DurableStateUpdateStore
import akka.Done
import scala.concurrent.{Future, ExecutionContext}
case class ApplicationConfig(
version: String,
features: Set[String],
settings: Map[String, Any],
environment: String
)
class ConfigurationManager(
stateStore: DurableStateUpdateStore[ApplicationConfig]
)(implicit ec: ExecutionContext) {
private val configId = "application-config"
def getCurrentConfig: Future[ApplicationConfig] = {
stateStore.getObject(configId).map { result =>
result.value.getOrElse(getDefaultConfig)
}
}
def updateConfig(config: ApplicationConfig): Future[Done] = {
for {
current <- stateStore.getObject(configId)
revision = current.revision
result <- stateStore.upsertObject(configId, revision, config, "")
} yield result
}
def enableFeature(feature: String): Future[Done] = {
updateConfigField { config =>
config.copy(features = config.features + feature)
}
}
def disableFeature(feature: String): Future[Done] = {
updateConfigField { config =>
config.copy(features = config.features - feature)
}
}
def updateSetting(key: String, value: Any): Future[Done] = {
updateConfigField { config =>
config.copy(settings = config.settings + (key -> value))
}
}
private def updateConfigField(updater: ApplicationConfig => ApplicationConfig): Future[Done] = {
for {
current <- getCurrentConfig
updated = updater(current)
result <- updateConfig(updated)
} yield result
}
private def getDefaultConfig: ApplicationConfig = {
ApplicationConfig(
version = "1.0.0",
features = Set.empty,
settings = Map.empty,
environment = "development"
)
}
}import akka.persistence.state.{DurableStateStoreException, RevisionMismatchException}
class SafeStateManager[T](
stateStore: DurableStateUpdateStore[T]
)(implicit ec: ExecutionContext) {
def safeUpdate(
persistenceId: String,
updater: T => T,
maxRetries: Int = 3
): Future[Either[String, Long]] = {
def attemptUpdate(retryCount: Int): Future[Either[String, Long]] = {
for {
current <- stateStore.getObject(persistenceId)
result <- current.value match {
case Some(value) =>
val updated = updater(value)
stateStore.upsertObject(persistenceId, current.revision, updated, "")
.map(_ => Right(current.revision + 1))
.recover {
case _: RevisionMismatchException if retryCount < maxRetries =>
// Retry on revision mismatch (optimistic concurrency conflict)
Left("revision_mismatch")
case ex: DurableStateStoreException =>
Left(s"Store error: ${ex.getMessage}")
case ex =>
Left(s"Unexpected error: ${ex.getMessage}")
}
case None =>
Future.successful(Left("not_found"))
}
finalResult <- result match {
case Left("revision_mismatch") => attemptUpdate(retryCount + 1)
case other => Future.successful(other)
}
} yield finalResult
}
attemptUpdate(0)
}
}Durable state stores are configured in application.conf:
akka.persistence.state {
plugin = "akka.persistence.state.inmem"
# In-memory state store (for testing)
inmem {
class = "akka.persistence.state.InMemDurableStateStoreProvider"
}
# Custom state store plugin
my-state-store {
class = "com.example.MyDurableStateStoreProvider"
# Plugin-specific configuration
connection-string = "postgresql://localhost/mydb"
table-name = "durable_state"
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13