Universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends with support for CQRS and event sourcing patterns.
Durable state queries provide interfaces for querying state changes and persistence IDs from durable state stores. These complement event-based queries by focusing on state changes rather than individual events.
Base trait and implementations for representing state changes.
/**
* The DurableStateStoreQuery stream elements for DurableStateStoreQuery.
* The implementation can be UpdatedDurableState or DeletedDurableState.
* Not for user extension.
*/
sealed trait DurableStateChange[A] {
/** The persistence id of the origin entity */
def persistenceId: String
/** The offset that can be used in next changes or currentChanges query */
def offset: Offset
}Represents an updated state change with the new value.
/**
* Updated durable state change containing the new state value.
*
* @param persistenceId The persistence id of the origin entity
* @param revision The revision number from the origin entity
* @param value The new state value
* @param offset The offset that can be used in next changes or currentChanges query
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
*/
final class UpdatedDurableState[A](
val persistenceId: String,
val revision: Long,
val value: A,
override val offset: Offset,
val timestamp: Long
) extends DurableStateChange[A]Usage Examples:
import akka.persistence.query.{UpdatedDurableState, Sequence}
// Pattern matching on state changes
readJournal
.changes("user-states", offset)
.runForeach {
case updated: UpdatedDurableState[UserState] =>
println(s"User ${updated.persistenceId} updated to revision ${updated.revision}")
println(s"New state: ${updated.value}")
println(s"Updated at: ${updated.timestamp}")
// Process the updated state
processUserStateUpdate(updated.value)
case deleted: DeletedDurableState[UserState] =>
println(s"User ${deleted.persistenceId} deleted at revision ${deleted.revision}")
processUserStateDeletion(deleted.persistenceId)
}
// Extract fields using unapply
readJournal
.changes("user-states", offset)
.runForeach {
case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) =>
println(s"Updated: $persistenceId at revision $revision")
updateProjection(persistenceId, value.asInstanceOf[UserState])
}Represents a deleted state change.
/**
* Deleted durable state change indicating the state was removed.
*
* @param persistenceId The persistence id of the origin entity
* @param revision The revision number from the origin entity
* @param offset The offset that can be used in next changes or currentChanges query
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
*/
final class DeletedDurableState[A](
val persistenceId: String,
val revision: Long,
override val offset: Offset,
val timestamp: Long
) extends DurableStateChange[A]Usage Examples:
import akka.persistence.query.DeletedDurableState
// Handle deleted state
readJournal
.changes("user-states", offset)
.runForeach {
case deleted: DeletedDurableState[UserState] =>
println(s"User ${deleted.persistenceId} deleted at revision ${deleted.revision}")
// Clean up related data
cleanupUserData(deleted.persistenceId)
// Update read models
removeFromReadModel(deleted.persistenceId)
case updated: UpdatedDurableState[UserState] =>
// Handle updates
processUserStateUpdate(updated.value)
}
// Pattern matching with extraction
readJournal
.changes("user-states", offset)
.runForeach {
case DeletedDurableState(persistenceId, revision, offset, timestamp) =>
println(s"Deleted: $persistenceId at revision $revision, timestamp $timestamp")
handleDeletion(persistenceId)
}Main query interface for durable state changes.
/**
* Query API for reading durable state objects.
*/
trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
/**
* Get a source of the most recent changes made to objects with the given tag since the passed in offset.
*
* Note that this only returns the most recent change to each object, if an object has been updated multiple times
* since the offset, only the most recent of those changes will be part of the stream.
*
* This will return changes that occurred up to when the Source returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* @param tag The tag to get changes for
* @param offset The offset to get changes since. Must either be NoOffset to get changes since the beginning of time,
* or an offset that has been previously returned by this query
* @return A source of state changes
*/
def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
/**
* Get a source of the most recent changes made to objects of the given tag since the passed in offset.
*
* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
* they happen.
*
* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most
* recent change for each object since the offset will be emitted. In particular, multiple updates to a given object
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source.
*
* @param tag The tag to get changes for
* @param offset The offset to get changes since. Must either be NoOffset to get changes since the beginning of time,
* or an offset that has been previously returned by this query
* @return A source of state changes
*/
def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.DurableStateStoreQuery
import akka.persistence.query.NoOffset
val stateStore: DurableStateStoreQuery[UserState] = getDurableStateStore()
// Query current state changes (finite stream)
stateStore
.currentChanges("user-states", NoOffset)
.runForeach {
case updated: UpdatedDurableState[UserState] =>
println(s"Current user state: ${updated.value}")
buildInitialProjection(updated.persistenceId, updated.value)
case deleted: DeletedDurableState[UserState] =>
println(s"User ${deleted.persistenceId} is deleted")
}
.onComplete {
case Success(_) => println("Finished processing current states")
case Failure(ex) => println(s"Failed to process current states: $ex")
}
// Query live state changes (infinite stream)
var lastOffset: Offset = NoOffset
stateStore
.changes("user-states", lastOffset)
.runForeach { change =>
change match {
case updated: UpdatedDurableState[UserState] =>
updateReadModel(updated.persistenceId, updated.value)
case deleted: DeletedDurableState[UserState] =>
removeFromReadModel(deleted.persistenceId)
}
// Update offset for resumption
lastOffset = change.offset
saveOffset(lastOffset)
}Query interface for paginated persistence IDs from durable state stores.
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait DurableStateStorePagedPersistenceIdsQuery[A] extends DurableStateStore[A] {
/**
* Get current persistence ids with pagination support.
*
* @param afterId Start after this persistence ID (exclusive)
* @param limit Maximum number of persistence IDs to return
* @return Source of persistence IDs
*/
def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery
val stateStore: DurableStateStorePagedPersistenceIdsQuery[UserState] = getDurableStateStore()
// Get first page of persistence IDs
stateStore
.currentPersistenceIds(None, 100L)
.runForeach { persistenceId =>
println(s"Found persistence ID: $persistenceId")
// Load current state for this persistence ID
stateStore
.asInstanceOf[DurableStateStore[UserState]]
.getObject(persistenceId)
.foreach {
case Some(GetObjectResult(userState, revision)) =>
println(s"Current state for $persistenceId: $userState")
case None =>
println(s"No state found for $persistenceId")
}
}
// Paginate through all persistence IDs
def processAllPersistenceIds(afterId: Option[String] = None): Future[Unit] = {
stateStore
.currentPersistenceIds(afterId, 100L)
.runWith(Sink.seq)
.flatMap { ids =>
println(s"Processing ${ids.size} persistence IDs")
// Process this batch
ids.foreach(processPersistenceId)
if (ids.size == 100) {
// More pages available, continue with next page
processAllPersistenceIds(ids.lastOption)
} else {
// Last page, we're done
Future.successful(())
}
}
}Slice-based query interface for durable state stores (typed API).
/**
* A plugin may optionally support this query by implementing this trait.
* API May Change
*/
trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
/**
* Get current state changes by slice range.
*/
def currentChangesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset
): Source[DurableStateChange[A], NotUsed]
/**
* Get live state changes by slice range.
*/
def changesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset
): Source[DurableStateChange[A], NotUsed]
/** Get the slice number for a given persistence ID */
def sliceForPersistenceId(persistenceId: String): Int
/** Get slice ranges for distributing the load */
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}Usage Examples:
import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
val stateStore: DurableStateStoreBySliceQuery[UserState] = getTypedDurableStateStore()
// Query state changes by slice range
stateStore
.changesBySlices("User", 0, 255, offset)
.runForeach { change =>
change match {
case updated: UpdatedDurableState[UserState] =>
println(s"User ${updated.persistenceId} updated in slice ${stateStore.sliceForPersistenceId(updated.persistenceId)}")
processUserUpdate(updated.value)
case deleted: DeletedDurableState[UserState] =>
println(s"User ${deleted.persistenceId} deleted")
processUserDeletion(deleted.persistenceId)
}
}
// Distribute state processing across multiple slices
val sliceRanges = stateStore.sliceRanges(4)
sliceRanges.zipWithIndex.foreach { case (range, processorId) =>
println(s"Processor $processorId handles slices ${range.start} to ${range.end}")
stateStore
.changesBySlices("User", range.start, range.end, offset)
.runForeach { change =>
processStateChangeInProcessor(processorId, change)
}
}Build read model projections from state changes:
case class UserProjection(
persistenceId: String,
name: String,
email: String,
lastUpdated: Long,
revision: Long
)
class UserProjectionBuilder(stateStore: DurableStateStoreQuery[UserState]) {
def buildProjections(): Future[Map[String, UserProjection]] = {
stateStore
.currentChanges("user-states", NoOffset)
.runFold(Map.empty[String, UserProjection]) { (projections, change) =>
change match {
case updated: UpdatedDurableState[UserState] =>
val projection = UserProjection(
persistenceId = updated.persistenceId,
name = updated.value.name,
email = updated.value.email,
lastUpdated = updated.timestamp,
revision = updated.revision
)
projections + (updated.persistenceId -> projection)
case deleted: DeletedDurableState[UserState] =>
projections - deleted.persistenceId
}
}
}
def maintainProjections(initialProjections: Map[String, UserProjection]): Unit = {
var projections = initialProjections
var lastOffset: Offset = NoOffset
stateStore
.changes("user-states", lastOffset)
.runForeach { change =>
change match {
case updated: UpdatedDurableState[UserState] =>
val projection = UserProjection(
persistenceId = updated.persistenceId,
name = updated.value.name,
email = updated.value.email,
lastUpdated = updated.timestamp,
revision = updated.revision
)
projections = projections + (updated.persistenceId -> projection)
saveProjection(projection)
case deleted: DeletedDurableState[UserState] =>
projections = projections - deleted.persistenceId
deleteProjection(deleted.persistenceId)
}
lastOffset = change.offset
saveOffset(lastOffset)
}
}
}Filter state changes based on criteria:
// Filter by revision numbers
stateStore
.changes("user-states", offset)
.filter {
case updated: UpdatedDurableState[UserState] => updated.revision > 5L
case deleted: DeletedDurableState[UserState] => deleted.revision > 5L
}
.runForeach(processImportantChange)
// Filter by timestamp (last hour only)
val oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000)
stateStore
.changes("user-states", offset)
.filter(_.timestamp > oneHourAgo)
.runForeach(processRecentChange)
// Filter by state content
stateStore
.changes("user-states", offset)
.collect {
case updated: UpdatedDurableState[UserState] if updated.value.isActive =>
updated
}
.runForeach(processActiveUserUpdate)Aggregate state changes for analytics:
case class StateChangeMetrics(
totalUpdates: Long,
totalDeletes: Long,
uniqueEntities: Set[String],
lastProcessed: Long
)
def aggregateStateChanges(): Future[StateChangeMetrics] = {
stateStore
.currentChanges("user-states", NoOffset)
.runFold(StateChangeMetrics(0L, 0L, Set.empty, 0L)) { (metrics, change) =>
change match {
case updated: UpdatedDurableState[UserState] =>
metrics.copy(
totalUpdates = metrics.totalUpdates + 1,
uniqueEntities = metrics.uniqueEntities + updated.persistenceId,
lastProcessed = math.max(metrics.lastProcessed, updated.timestamp)
)
case deleted: DeletedDurableState[UserState] =>
metrics.copy(
totalDeletes = metrics.totalDeletes + 1,
uniqueEntities = metrics.uniqueEntities + deleted.persistenceId,
lastProcessed = math.max(metrics.lastProcessed, deleted.timestamp)
)
}
}
}Handle failures in state change processing:
stateStore
.changes("user-states", offset)
.recover {
case ex: Exception =>
println(s"Error in state change stream: $ex")
// Return a placeholder or restart logic
UpdatedDurableState("error", 0L, UserState.empty, NoOffset, System.currentTimeMillis())
}
.runForeach { change =>
try {
processStateChange(change)
} catch {
case ex: Exception =>
println(s"Failed to process state change: $ex")
handleProcessingFailure(change, ex)
}
}Safely handle offset storage and retrieval:
class StateChangeProcessor(stateStore: DurableStateStoreQuery[UserState]) {
private var currentOffset: Offset = loadStoredOffset().getOrElse(NoOffset)
def start(): Unit = {
stateStore
.changes("user-states", currentOffset)
.runForeach { change =>
try {
processChange(change)
currentOffset = change.offset
saveOffset(currentOffset)
} catch {
case ex: Exception =>
println(s"Processing failed, keeping offset at $currentOffset: $ex")
// Don't update offset on failure
}
}
}
def restart(): Unit = {
// Reload offset and restart processing
currentOffset = loadStoredOffset().getOrElse(NoOffset)
start()
}
}Java API equivalents are available in the javadsl package:
import akka.persistence.query.javadsl.DurableStateStoreQuery;
import akka.persistence.query.UpdatedDurableState;
import akka.persistence.query.DeletedDurableState;
// Java API usage
DurableStateStoreQuery<UserState> stateStore = getJavaDurableStateStore();
// Query current changes
stateStore
.currentChanges("user-states", NoOffset.getInstance())
.runForeach(change -> {
if (change instanceof UpdatedDurableState) {
UpdatedDurableState<UserState> updated = (UpdatedDurableState<UserState>) change;
System.out.println("Updated: " + updated.persistenceId());
processUserUpdate(updated.value());
} else if (change instanceof DeletedDurableState) {
DeletedDurableState<UserState> deleted = (DeletedDurableState<UserState>) change;
System.out.println("Deleted: " + deleted.persistenceId());
processUserDeletion(deleted.persistenceId());
}
}, system);case class UserState(
name: String,
email: String,
isActive: Boolean,
preferences: Map[String, String]
) {
def isEmpty: Boolean = name.isEmpty && email.isEmpty
}
object UserState {
val empty: UserState = UserState("", "", false, Map.empty)
}
case class GetObjectResult[A](value: A, revision: Long)
trait DurableStateStore[A] {
def getObject(persistenceId: String): Future[Option[GetObjectResult[A]]]
def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done]
def deleteObject(persistenceId: String): Future[Done]
}
import akka.Done
import scala.collection.immutable
import akka.stream.scaladsl.{Source, Sink}
import akka.NotUsed
import scala.concurrent.Future
import scala.util.{Success, Failure}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-query-2-13