Durable state queries provide interfaces for querying state changes and persistence IDs from durable state stores. These complement event-based queries by focusing on state changes rather than individual events.
Base trait and implementations for representing state changes.
/**
* The DurableStateStoreQuery stream elements for DurableStateStoreQuery.
* The implementation can be UpdatedDurableState or DeletedDurableState.
* Not for user extension.
*/
sealed trait DurableStateChange[A] {
/** The persistence id of the origin entity */
def persistenceId: String
/** The offset that can be used in next changes or currentChanges query */
def offset: Offset
}Represents an updated state change with the new value.
/**
* Updated durable state change containing the new state value.
*
* @param persistenceId The persistence id of the origin entity
* @param revision The revision number from the origin entity
* @param value The new state value
* @param offset The offset that can be used in next changes or currentChanges query
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
*/
final class UpdatedDurableState[A](
val persistenceId: String,
val revision: Long,
val value: A,
override val offset: Offset,
val timestamp: Long
) extends DurableStateChange[A]Usage Examples:
import akka.persistence.query.{UpdatedDurableState, Sequence}
// Pattern matching on state changes
readJournal
.changes("user-states", offset)
.runForeach {
case updated: UpdatedDurableState[UserState] =>
println(s"User ${updated.persistenceId} updated to revision ${updated.revision}")
println(s"New state: ${updated.value}")
println(s"Updated at: ${updated.timestamp}")
// Process the updated state
processUserStateUpdate(updated.value)
case deleted: DeletedDurableState[UserState] =>
println(s"User ${deleted.persistenceId} deleted at revision ${deleted.revision}")
processUserStateDeletion(deleted.persistenceId)
}
// Extract fields using unapply
readJournal
.changes("user-states", offset)
.runForeach {
case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) =>
println(s"Updated: $persistenceId at revision $revision")
updateProjection(persistenceId, value.asInstanceOf[UserState])
}Represents a deleted state change.
/**
* Deleted durable state change indicating the state was removed.
*
* @param persistenceId The persistence id of the origin entity
* @param revision The revision number from the origin entity
* @param offset The offset that can be used in next changes or currentChanges query
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
*/
final class DeletedDurableState[A](
val persistenceId: String,
val revision: Long,
override val offset: Offset,
val timestamp: Long
) extends DurableStateChange[A]Usage Examples:
import akka.persistence.query.DeletedDurableState
// Handle deleted state
readJournal
.changes("user-states", offset)
.runForeach {
case deleted: DeletedDurableState[UserState] =>
println(s"User ${deleted.persistenceId} deleted at revision ${deleted.revision}")
// Clean up related data
cleanupUserData(deleted.persistenceId)
// Update read models
removeFromReadModel(deleted.persistenceId)
case updated: UpdatedDurableState[UserState] =>
// Handle updates
processUserStateUpdate(updated.value)
}
// Pattern matching with extraction
readJournal
.changes("user-states", offset)
.runForeach {
case DeletedDurableState(persistenceId, revision, offset, timestamp) =>
println(s"Deleted: $persistenceId at revision $revision, timestamp $timestamp")
handleDeletion(persistenceId)
}Main query interface for durable state changes.
/**
* Query API for reading durable state objects.
*/
trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
/**
* Get a source of the most recent changes made to objects with the given tag since the passed in offset.
*
* Note that this only returns the most recent change to each object, if an object has been updated multiple times
* since the offset, only the most recent of those changes will be part of the stream.
*
* This will return changes that occurred up to when the Source returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* @param tag The tag to get changes for
* @param offset The offset to get changes since. Must either be NoOffset to get changes since the beginning of time,
* or an offset that has been previously returned by this query
* @return A source of state changes
*/
def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
/**
* Get a source of the most recent changes made to objects of the given tag since the passed in offset.
*
* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
* they happen.
*
* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most
* recent change for each object since the offset will be emitted. In particular, multiple updates to a given object
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source.
*
* @param tag The tag to get changes for
* @param offset The offset to get changes since. Must either be NoOffset to get changes since the beginning of time,
* or an offset that has been previously returned by this query
* @return A source of state changes
*/
def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.DurableStateStoreQuery
import akka.persistence.query.NoOffset
val stateStore: DurableStateStoreQuery[UserState] = getDurableStateStore()
// Query current state changes (finite stream)
stateStore
.currentChanges("user-states", NoOffset)
.runForeach {
case updated: UpdatedDurableState[UserState] =>
println(s"Current user state: ${updated.value}")
buildInitialProjection(updated.persistenceId, updated.value)
case deleted: DeletedDurableState[UserState] =>
println(s"User ${deleted.persistenceId} is deleted")
}
.onComplete {
case Success(_) => println("Finished processing current states")
case Failure(ex) => println(s"Failed to process current states: $ex")
}
// Query live state changes (infinite stream)
var lastOffset: Offset = NoOffset
stateStore
.changes("user-states", lastOffset)
.runForeach { change =>
change match {
case updated: UpdatedDurableState[UserState] =>
updateReadModel(updated.persistenceId, updated.value)
case deleted: DeletedDurableState[UserState] =>
removeFromReadModel(deleted.persistenceId)
}
// Update offset for resumption
lastOffset = change.offset
saveOffset(lastOffset)
}Query interface for paginated persistence IDs from durable state stores.
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait DurableStateStorePagedPersistenceIdsQuery[A] extends DurableStateStore[A] {
/**
* Get current persistence ids with pagination support.
*
* @param afterId Start after this persistence ID (exclusive)
* @param limit Maximum number of persistence IDs to return
* @return Source of persistence IDs
*/
def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery
val stateStore: DurableStateStorePagedPersistenceIdsQuery[UserState] = getDurableStateStore()
// Get first page of persistence IDs
stateStore
.currentPersistenceIds(None, 100L)
.runForeach { persistenceId =>
println(s"Found persistence ID: $persistenceId")
// Load current state for this persistence ID
stateStore
.asInstanceOf[DurableStateStore[UserState]]
.getObject(persistenceId)
.foreach {
case Some(GetObjectResult(userState, revision)) =>
println(s"Current state for $persistenceId: $userState")
case None =>
println(s"No state found for $persistenceId")
}
}
// Paginate through all persistence IDs
def processAllPersistenceIds(afterId: Option[String] = None): Future[Unit] = {
stateStore
.currentPersistenceIds(afterId, 100L)
.runWith(Sink.seq)
.flatMap { ids =>
println(s"Processing ${ids.size} persistence IDs")
// Process this batch
ids.foreach(processPersistenceId)
if (ids.size == 100) {
// More pages available, continue with next page
processAllPersistenceIds(ids.lastOption)
} else {
// Last page, we're done
Future.successful(())
}
}
}Slice-based query interface for durable state stores (typed API).
/**
* A plugin may optionally support this query by implementing this trait.
* API May Change
*/
trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
/**
* Get current state changes by slice range.
*/
def currentChangesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset
): Source[DurableStateChange[A], NotUsed]
/**
* Get live state changes by slice range.
*/
def changesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset
): Source[DurableStateChange[A], NotUsed]
/** Get the slice number for a given persistence ID */
def sliceForPersistenceId(persistenceId: String): Int
/** Get slice ranges for distributing the load */
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}Usage Examples:
import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
val stateStore: DurableStateStoreBySliceQuery[UserState] = getTypedDurableStateStore()
// Query state changes by slice range
stateStore
.changesBySlices("User", 0, 255, offset)
.runForeach { change =>
change match {
case updated: UpdatedDurableState[UserState] =>
println(s"User ${updated.persistenceId} updated in slice ${stateStore.sliceForPersistenceId(updated.persistenceId)}")
processUserUpdate(updated.value)
case deleted: DeletedDurableState[UserState] =>
println(s"User ${deleted.persistenceId} deleted")
processUserDeletion(deleted.persistenceId)
}
}
// Distribute state processing across multiple slices
val sliceRanges = stateStore.sliceRanges(4)
sliceRanges.zipWithIndex.foreach { case (range, processorId) =>
println(s"Processor $processorId handles slices ${range.start} to ${range.end}")
stateStore
.changesBySlices("User", range.start, range.end, offset)
.runForeach { change =>
processStateChangeInProcessor(processorId, change)
}
}Build read model projections from state changes:
case class UserProjection(
persistenceId: String,
name: String,
email: String,
lastUpdated: Long,
revision: Long
)
class UserProjectionBuilder(stateStore: DurableStateStoreQuery[UserState]) {
def buildProjections(): Future[Map[String, UserProjection]] = {
stateStore
.currentChanges("user-states", NoOffset)
.runFold(Map.empty[String, UserProjection]) { (projections, change) =>
change match {
case updated: UpdatedDurableState[UserState] =>
val projection = UserProjection(
persistenceId = updated.persistenceId,
name = updated.value.name,
email = updated.value.email,
lastUpdated = updated.timestamp,
revision = updated.revision
)
projections + (updated.persistenceId -> projection)
case deleted: DeletedDurableState[UserState] =>
projections - deleted.persistenceId
}
}
}
def maintainProjections(initialProjections: Map[String, UserProjection]): Unit = {
var projections = initialProjections
var lastOffset: Offset = NoOffset
stateStore
.changes("user-states", lastOffset)
.runForeach { change =>
change match {
case updated: UpdatedDurableState[UserState] =>
val projection = UserProjection(
persistenceId = updated.persistenceId,
name = updated.value.name,
email = updated.value.email,
lastUpdated = updated.timestamp,
revision = updated.revision
)
projections = projections + (updated.persistenceId -> projection)
saveProjection(projection)
case deleted: DeletedDurableState[UserState] =>
projections = projections - deleted.persistenceId
deleteProjection(deleted.persistenceId)
}
lastOffset = change.offset
saveOffset(lastOffset)
}
}
}Filter state changes based on criteria:
// Filter by revision numbers
stateStore
.changes("user-states", offset)
.filter {
case updated: UpdatedDurableState[UserState] => updated.revision > 5L
case deleted: DeletedDurableState[UserState] => deleted.revision > 5L
}
.runForeach(processImportantChange)
// Filter by timestamp (last hour only)
val oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000)
stateStore
.changes("user-states", offset)
.filter(_.timestamp > oneHourAgo)
.runForeach(processRecentChange)
// Filter by state content
stateStore
.changes("user-states", offset)
.collect {
case updated: UpdatedDurableState[UserState] if updated.value.isActive =>
updated
}
.runForeach(processActiveUserUpdate)Aggregate state changes for analytics:
case class StateChangeMetrics(
totalUpdates: Long,
totalDeletes: Long,
uniqueEntities: Set[String],
lastProcessed: Long
)
def aggregateStateChanges(): Future[StateChangeMetrics] = {
stateStore
.currentChanges("user-states", NoOffset)
.runFold(StateChangeMetrics(0L, 0L, Set.empty, 0L)) { (metrics, change) =>
change match {
case updated: UpdatedDurableState[UserState] =>
metrics.copy(
totalUpdates = metrics.totalUpdates + 1,
uniqueEntities = metrics.uniqueEntities + updated.persistenceId,
lastProcessed = math.max(metrics.lastProcessed, updated.timestamp)
)
case deleted: DeletedDurableState[UserState] =>
metrics.copy(
totalDeletes = metrics.totalDeletes + 1,
uniqueEntities = metrics.uniqueEntities + deleted.persistenceId,
lastProcessed = math.max(metrics.lastProcessed, deleted.timestamp)
)
}
}
}Handle failures in state change processing:
stateStore
.changes("user-states", offset)
.recover {
case ex: Exception =>
println(s"Error in state change stream: $ex")
// Return a placeholder or restart logic
UpdatedDurableState("error", 0L, UserState.empty, NoOffset, System.currentTimeMillis())
}
.runForeach { change =>
try {
processStateChange(change)
} catch {
case ex: Exception =>
println(s"Failed to process state change: $ex")
handleProcessingFailure(change, ex)
}
}Safely handle offset storage and retrieval:
class StateChangeProcessor(stateStore: DurableStateStoreQuery[UserState]) {
private var currentOffset: Offset = loadStoredOffset().getOrElse(NoOffset)
def start(): Unit = {
stateStore
.changes("user-states", currentOffset)
.runForeach { change =>
try {
processChange(change)
currentOffset = change.offset
saveOffset(currentOffset)
} catch {
case ex: Exception =>
println(s"Processing failed, keeping offset at $currentOffset: $ex")
// Don't update offset on failure
}
}
}
def restart(): Unit = {
// Reload offset and restart processing
currentOffset = loadStoredOffset().getOrElse(NoOffset)
start()
}
}Java API equivalents are available in the javadsl package:
import akka.persistence.query.javadsl.DurableStateStoreQuery;
import akka.persistence.query.UpdatedDurableState;
import akka.persistence.query.DeletedDurableState;
// Java API usage
DurableStateStoreQuery<UserState> stateStore = getJavaDurableStateStore();
// Query current changes
stateStore
.currentChanges("user-states", NoOffset.getInstance())
.runForeach(change -> {
if (change instanceof UpdatedDurableState) {
UpdatedDurableState<UserState> updated = (UpdatedDurableState<UserState>) change;
System.out.println("Updated: " + updated.persistenceId());
processUserUpdate(updated.value());
} else if (change instanceof DeletedDurableState) {
DeletedDurableState<UserState> deleted = (DeletedDurableState<UserState>) change;
System.out.println("Deleted: " + deleted.persistenceId());
processUserDeletion(deleted.persistenceId());
}
}, system);case class UserState(
name: String,
email: String,
isActive: Boolean,
preferences: Map[String, String]
) {
def isEmpty: Boolean = name.isEmpty && email.isEmpty
}
object UserState {
val empty: UserState = UserState("", "", false, Map.empty)
}
case class GetObjectResult[A](value: A, revision: Long)
trait DurableStateStore[A] {
def getObject(persistenceId: String): Future[Option[GetObjectResult[A]]]
def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done]
def deleteObject(persistenceId: String): Future[Done]
}
import akka.Done
import scala.collection.immutable
import akka.stream.scaladsl.{Source, Sink}
import akka.NotUsed
import scala.concurrent.Future
import scala.util.{Success, Failure}