CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-persistence-2-13

Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.

Pending
Overview
Eval results
Files

durable-state.mddocs/

Durable State Management

Durable state storage for mutable state management with revision tracking and pluggable storage backends.

Capabilities

DurableStateStore (Scala API)

API for reading durable state objects with type safety.

/**
 * API for reading durable state objects
 */
trait DurableStateStore[A] {
  /** Retrieve state object by persistence ID */ 
  def getObject(persistenceId: String): Future[GetObjectResult[A]]
}

GetObjectResult

Result container for durable state retrieval operations.

/**
 * Result of durable state retrieval containing value and revision
 */
case class GetObjectResult[A](
  value: Option[A],
  revision: Long
) {
  /** Convert to Java API result */
  def toJava: JGetObjectResult[A] = JGetObjectResult(value.asJava, revision)
}

Usage Examples:

import akka.persistence.state.scaladsl.DurableStateStore
import scala.concurrent.Future

// Basic state retrieval
val stateStore: DurableStateStore[UserProfile] = // ... obtain store
val userStateFuture: Future[GetObjectResult[UserProfile]] = 
  stateStore.getObject("user-123")

userStateFuture.foreach { result =>
  result.value match {
    case Some(profile) => 
      println(s"User profile at revision ${result.revision}: $profile")
    case None => 
      println("No state found for user-123")
  }
}

DurableStateUpdateStore (Scala API)

Extended API for reading and updating durable state objects.

/**
 * API for reading and updating durable state objects
 */
trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
  /** 
   * Upsert state object with optimistic concurrency control
   * @param persistenceId Unique identifier for the state
   * @param revision Expected current revision (starts at 1)
   * @param value New state value
   * @param tag Optional tag for the update
   * @return Future completed when operation finishes
   */
  def upsertObject(
    persistenceId: String,
    revision: Long,
    value: A,
    tag: String
  ): Future[Done]
  
  /**
   * Delete state object
   * @param persistenceId Unique identifier for the state  
   * @param revision Expected current revision
   * @return Future completed when operation finishes
   */
  def deleteObject(
    persistenceId: String,
    revision: Long
  ): Future[Done]
}

Java API

DurableStateStore (Java API)

/**
 * Java API for reading durable state objects
 */
trait DurableStateStore[A] {
  /** Retrieve state object by persistence ID */
  def getObject(persistenceId: String): CompletionStage[JGetObjectResult[A]]
}

JGetObjectResult

/**
 * Java API result of durable state retrieval
 */
case class JGetObjectResult[A](
  value: Optional[A],
  revision: Long
)

DurableStateUpdateStore (Java API)

/**
 * Java API for reading and updating durable state objects
 */
trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
  /** Upsert state object with optimistic concurrency control */
  def upsertObject(
    persistenceId: String,
    revision: Long,
    value: A,
    tag: String
  ): CompletionStage[Done]
  
  /** Delete state object */
  def deleteObject(
    persistenceId: String, 
    revision: Long
  ): CompletionStage[Done]
}

Durable State Registry and Provider

DurableStateStoreRegistry

Registry for obtaining durable state store instances.

/**
 * Registry for obtaining configured durable state store instances
 */
object DurableStateStoreRegistry extends ExtensionId[DurableStateStoreRegistry] {
  /** Get durable state store for the specified plugin ID */
  def durableStateStoreFor[A](
    system: ActorSystem,
    pluginId: String
  ): DurableStateStore[A]
}

class DurableStateStoreRegistry(system: ExtendedActorSystem) extends Extension {
  /** Get durable state store by plugin ID */
  def durableStateStoreFor[A](pluginId: String): DurableStateStore[A]
}

DurableStateStoreProvider

Provider interface for durable state store plugins.

/**
 * Provider interface for durable state store plugin implementations
 */
trait DurableStateStoreProvider {
  /** Create durable state store instance */
  def scaladslDurableStateStore(): DurableStateStore[Any]
  
  /** Create Java API durable state store instance */
  def javadslDurableStateStore(): JDurableStateStore[AnyRef]
}

Example: User Profile Management

import akka.persistence.state.scaladsl.{DurableStateStore, DurableStateUpdateStore}
import akka.actor.ActorSystem
import akka.Done
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Failure}

// Domain model
case class UserProfile(
  userId: String,
  name: String,
  email: String,
  preferences: Map[String, Any],
  lastModified: Long
)

class UserProfileService(
  stateStore: DurableStateUpdateStore[UserProfile]
)(implicit ec: ExecutionContext) {
  
  def getProfile(userId: String): Future[Option[UserProfile]] = {
    stateStore.getObject(s"user-profile-$userId").map(_.value)
  }
  
  def createProfile(profile: UserProfile): Future[Done] = {
    val profileWithTimestamp = profile.copy(lastModified = System.currentTimeMillis())
    stateStore.upsertObject(
      persistenceId = s"user-profile-${profile.userId}",
      revision = 0L, // New profile
      value = profileWithTimestamp,
      tag = ""
    )
  }
  
  def updateProfile(userId: String, updater: UserProfile => UserProfile): Future[Option[Done]] = {
    for {
      current <- stateStore.getObject(s"user-profile-$userId")
      result <- current.value match {
        case Some(profile) =>
          val updated = updater(profile).copy(lastModified = System.currentTimeMillis())
          stateStore.upsertObject(
            persistenceId = s"user-profile-$userId",
            revision = current.revision,
            value = updated,
            tag = ""
          ).map(result => Some(result))
        case None =>
          Future.successful(None)
      }
    } yield result
  }
  
  def deleteProfile(userId: String): Future[Boolean] = {
    for {
      current <- stateStore.getObject(s"user-profile-$userId")
      result <- current.value match {
        case Some(_) =>
          stateStore.deleteObject(
            persistenceId = s"user-profile-$userId",
            revision = current.revision
          ).map(_ => true)
        case None =>
          Future.successful(false)
      }
    } yield result
  }
}

// Usage example
implicit val system: ActorSystem = ActorSystem("user-service")
implicit val ec: ExecutionContext = system.dispatcher

val stateStore: DurableStateUpdateStore[UserProfile] = 
  DurableStateStoreRegistry(system).durableStateStoreFor("my-state-store")

val userService = new UserProfileService(stateStore)

// Create new profile
val newProfile = UserProfile(
  userId = "user123",
  name = "John Doe", 
  email = "john@example.com",
  preferences = Map("theme" -> "dark", "notifications" -> true),
  lastModified = 0L
)

userService.createProfile(newProfile).onComplete {
  case Success(_) => println("Profile created successfully")
  case Failure(ex) => println(s"Failed to create profile: ${ex.getMessage}")
}

// Update profile
userService.updateProfile("user123", profile => 
  profile.copy(preferences = profile.preferences + ("language" -> "en"))
).onComplete {
  case Success(Some(_)) => println("Profile updated successfully")
  case Success(None) => println("Profile not found")
  case Failure(ex) => println(s"Failed to update profile: ${ex.getMessage}")
}

Example: Configuration Management

import akka.persistence.state.scaladsl.DurableStateUpdateStore
import akka.Done
import scala.concurrent.{Future, ExecutionContext}

case class ApplicationConfig(
  version: String,
  features: Set[String],
  settings: Map[String, Any],
  environment: String
)

class ConfigurationManager(
  stateStore: DurableStateUpdateStore[ApplicationConfig]
)(implicit ec: ExecutionContext) {
  
  private val configId = "application-config"
  
  def getCurrentConfig: Future[ApplicationConfig] = {
    stateStore.getObject(configId).map { result =>
      result.value.getOrElse(getDefaultConfig)
    }
  }
  
  def updateConfig(config: ApplicationConfig): Future[Done] = {
    for {
      current <- stateStore.getObject(configId)
      revision = current.revision
      result <- stateStore.upsertObject(configId, revision, config, "")
    } yield result
  }
  
  def enableFeature(feature: String): Future[Done] = {
    updateConfigField { config =>
      config.copy(features = config.features + feature)
    }
  }
  
  def disableFeature(feature: String): Future[Done] = {
    updateConfigField { config =>
      config.copy(features = config.features - feature)
    }
  }
  
  def updateSetting(key: String, value: Any): Future[Done] = {
    updateConfigField { config =>
      config.copy(settings = config.settings + (key -> value))
    }
  }
  
  private def updateConfigField(updater: ApplicationConfig => ApplicationConfig): Future[Done] = {
    for {
      current <- getCurrentConfig
      updated = updater(current)
      result <- updateConfig(updated)
    } yield result
  }
  
  private def getDefaultConfig: ApplicationConfig = {
    ApplicationConfig(
      version = "1.0.0",
      features = Set.empty,
      settings = Map.empty,
      environment = "development"
    )
  }
}

Error Handling and Optimistic Concurrency

import akka.persistence.state.{DurableStateStoreException, RevisionMismatchException}

class SafeStateManager[T](
  stateStore: DurableStateUpdateStore[T]
)(implicit ec: ExecutionContext) {
  
  def safeUpdate(
    persistenceId: String,
    updater: T => T,
    maxRetries: Int = 3
  ): Future[Either[String, Long]] = {
    
    def attemptUpdate(retryCount: Int): Future[Either[String, Long]] = {
      for {
        current <- stateStore.getObject(persistenceId)
        result <- current.value match {
          case Some(value) =>
            val updated = updater(value)
            stateStore.upsertObject(persistenceId, current.revision, updated, "")
              .map(_ => Right(current.revision + 1))
              .recover {
                case _: RevisionMismatchException if retryCount < maxRetries =>
                  // Retry on revision mismatch (optimistic concurrency conflict)
                  Left("revision_mismatch")
                case ex: DurableStateStoreException =>
                  Left(s"Store error: ${ex.getMessage}")
                case ex =>
                  Left(s"Unexpected error: ${ex.getMessage}")
              }
          case None =>
            Future.successful(Left("not_found"))
        }
        finalResult <- result match {
          case Left("revision_mismatch") => attemptUpdate(retryCount + 1)
          case other => Future.successful(other)
        }
      } yield finalResult
    }
    
    attemptUpdate(0)
  }
}

Configuration

Durable state stores are configured in application.conf:

akka.persistence.state {
  plugin = "akka.persistence.state.inmem"
  
  # In-memory state store (for testing)
  inmem {
    class = "akka.persistence.state.InMemDurableStateStoreProvider"
  }
  
  # Custom state store plugin
  my-state-store {
    class = "com.example.MyDurableStateStoreProvider"
    
    # Plugin-specific configuration
    connection-string = "postgresql://localhost/mydb"
    table-name = "durable_state"
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13

docs

at-least-once-delivery.md

durable-state.md

event-adapters.md

index.md

journal-api.md

persistent-actors.md

plugin-development.md

snapshots.md

tile.json