or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md
tile.json

durable-state.mddocs/

Durable State Queries

Durable state queries provide interfaces for querying state changes and persistence IDs from durable state stores. These complement event-based queries by focusing on state changes rather than individual events.

Capabilities

Durable State Change Types

Base trait and implementations for representing state changes.

/**
 * The DurableStateStoreQuery stream elements for DurableStateStoreQuery.
 * The implementation can be UpdatedDurableState or DeletedDurableState.
 * Not for user extension.
 */
sealed trait DurableStateChange[A] {
  /** The persistence id of the origin entity */
  def persistenceId: String
  
  /** The offset that can be used in next changes or currentChanges query */
  def offset: Offset
}

Updated Durable State

Represents an updated state change with the new value.

/**
 * Updated durable state change containing the new state value.
 *
 * @param persistenceId The persistence id of the origin entity
 * @param revision The revision number from the origin entity
 * @param value The new state value
 * @param offset The offset that can be used in next changes or currentChanges query
 * @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
 */
final class UpdatedDurableState[A](
  val persistenceId: String,
  val revision: Long,
  val value: A,
  override val offset: Offset,
  val timestamp: Long
) extends DurableStateChange[A]

Usage Examples:

import akka.persistence.query.{UpdatedDurableState, Sequence}

// Pattern matching on state changes
readJournal
  .changes("user-states", offset)
  .runForeach {
    case updated: UpdatedDurableState[UserState] =>
      println(s"User ${updated.persistenceId} updated to revision ${updated.revision}")
      println(s"New state: ${updated.value}")
      println(s"Updated at: ${updated.timestamp}")
      
      // Process the updated state
      processUserStateUpdate(updated.value)
      
    case deleted: DeletedDurableState[UserState] =>
      println(s"User ${deleted.persistenceId} deleted at revision ${deleted.revision}")
      processUserStateDeletion(deleted.persistenceId)
  }

// Extract fields using unapply
readJournal
  .changes("user-states", offset)
  .runForeach {
    case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) =>
      println(s"Updated: $persistenceId at revision $revision")
      updateProjection(persistenceId, value.asInstanceOf[UserState])
  }

Deleted Durable State

Represents a deleted state change.

/**
 * Deleted durable state change indicating the state was removed.
 *
 * @param persistenceId The persistence id of the origin entity
 * @param revision The revision number from the origin entity  
 * @param offset The offset that can be used in next changes or currentChanges query
 * @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
 */
final class DeletedDurableState[A](
  val persistenceId: String,
  val revision: Long,
  override val offset: Offset,
  val timestamp: Long
) extends DurableStateChange[A]

Usage Examples:

import akka.persistence.query.DeletedDurableState

// Handle deleted state
readJournal
  .changes("user-states", offset)
  .runForeach {
    case deleted: DeletedDurableState[UserState] =>
      println(s"User ${deleted.persistenceId} deleted at revision ${deleted.revision}")
      
      // Clean up related data
      cleanupUserData(deleted.persistenceId)
      
      // Update read models
      removeFromReadModel(deleted.persistenceId)
      
    case updated: UpdatedDurableState[UserState] =>
      // Handle updates
      processUserStateUpdate(updated.value)
  }

// Pattern matching with extraction
readJournal
  .changes("user-states", offset)
  .runForeach {
    case DeletedDurableState(persistenceId, revision, offset, timestamp) =>
      println(s"Deleted: $persistenceId at revision $revision, timestamp $timestamp")
      handleDeletion(persistenceId)
  }

Durable State Store Query

Main query interface for durable state changes.

/**
 * Query API for reading durable state objects.
 */
trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
  /**
   * Get a source of the most recent changes made to objects with the given tag since the passed in offset.
   *
   * Note that this only returns the most recent change to each object, if an object has been updated multiple times
   * since the offset, only the most recent of those changes will be part of the stream.
   *
   * This will return changes that occurred up to when the Source returned by this call is materialized. Changes to
   * objects made since materialization are not guaranteed to be included in the results.
   *
   * @param tag The tag to get changes for
   * @param offset The offset to get changes since. Must either be NoOffset to get changes since the beginning of time, 
   *               or an offset that has been previously returned by this query
   * @return A source of state changes
   */
  def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
  
  /**
   * Get a source of the most recent changes made to objects of the given tag since the passed in offset.
   *
   * The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
   * they happen.
   *
   * Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most
   * recent change for each object since the offset will be emitted. In particular, multiple updates to a given object
   * in quick succession are likely to be skipped, with only the last update resulting in a change from this
   * source.
   *
   * @param tag The tag to get changes for  
   * @param offset The offset to get changes since. Must either be NoOffset to get changes since the beginning of time,
   *               or an offset that has been previously returned by this query
   * @return A source of state changes
   */
  def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
}

Usage Examples:

import akka.persistence.query.scaladsl.DurableStateStoreQuery
import akka.persistence.query.NoOffset

val stateStore: DurableStateStoreQuery[UserState] = getDurableStateStore()

// Query current state changes (finite stream)
stateStore
  .currentChanges("user-states", NoOffset)
  .runForeach {
    case updated: UpdatedDurableState[UserState] =>
      println(s"Current user state: ${updated.value}")
      buildInitialProjection(updated.persistenceId, updated.value)
      
    case deleted: DeletedDurableState[UserState] =>
      println(s"User ${deleted.persistenceId} is deleted")
  }
  .onComplete {
    case Success(_) => println("Finished processing current states")
    case Failure(ex) => println(s"Failed to process current states: $ex")
  }

// Query live state changes (infinite stream)
var lastOffset: Offset = NoOffset

stateStore
  .changes("user-states", lastOffset)
  .runForeach { change =>
    change match {
      case updated: UpdatedDurableState[UserState] =>
        updateReadModel(updated.persistenceId, updated.value)
        
      case deleted: DeletedDurableState[UserState] =>
        removeFromReadModel(deleted.persistenceId)
    }
    
    // Update offset for resumption
    lastOffset = change.offset
    saveOffset(lastOffset)
  }

Durable State Store Paged Persistence IDs Query

Query interface for paginated persistence IDs from durable state stores.

/**
 * A plugin may optionally support this query by implementing this trait.
 */
trait DurableStateStorePagedPersistenceIdsQuery[A] extends DurableStateStore[A] {
  /**
   * Get current persistence ids with pagination support.
   *
   * @param afterId Start after this persistence ID (exclusive)
   * @param limit Maximum number of persistence IDs to return  
   * @return Source of persistence IDs
   */
  def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]
}

Usage Examples:

import akka.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery

val stateStore: DurableStateStorePagedPersistenceIdsQuery[UserState] = getDurableStateStore()

// Get first page of persistence IDs
stateStore
  .currentPersistenceIds(None, 100L)
  .runForeach { persistenceId =>
    println(s"Found persistence ID: $persistenceId")
    
    // Load current state for this persistence ID
    stateStore
      .asInstanceOf[DurableStateStore[UserState]]
      .getObject(persistenceId)
      .foreach {
        case Some(GetObjectResult(userState, revision)) =>
          println(s"Current state for $persistenceId: $userState")
        case None =>
          println(s"No state found for $persistenceId")
      }
  }

// Paginate through all persistence IDs
def processAllPersistenceIds(afterId: Option[String] = None): Future[Unit] = {
  stateStore
    .currentPersistenceIds(afterId, 100L)
    .runWith(Sink.seq)
    .flatMap { ids =>
      println(s"Processing ${ids.size} persistence IDs")
      
      // Process this batch
      ids.foreach(processPersistenceId)
      
      if (ids.size == 100) {
        // More pages available, continue with next page
        processAllPersistenceIds(ids.lastOption)
      } else {
        // Last page, we're done
        Future.successful(())
      }
    }
}

Durable State Store by Slice Query

Slice-based query interface for durable state stores (typed API).

/**
 * A plugin may optionally support this query by implementing this trait.
 * API May Change
 */
trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
  /**
   * Get current state changes by slice range.
   */
  def currentChangesBySlices(
    entityType: String,
    minSlice: Int,
    maxSlice: Int,
    offset: Offset
  ): Source[DurableStateChange[A], NotUsed]
  
  /**
   * Get live state changes by slice range.
   */
  def changesBySlices(
    entityType: String,
    minSlice: Int,
    maxSlice: Int,
    offset: Offset
  ): Source[DurableStateChange[A], NotUsed]
  
  /** Get the slice number for a given persistence ID */
  def sliceForPersistenceId(persistenceId: String): Int
  
  /** Get slice ranges for distributing the load */
  def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

Usage Examples:

import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery

val stateStore: DurableStateStoreBySliceQuery[UserState] = getTypedDurableStateStore()

// Query state changes by slice range
stateStore
  .changesBySlices("User", 0, 255, offset)
  .runForeach { change =>
    change match {
      case updated: UpdatedDurableState[UserState] =>
        println(s"User ${updated.persistenceId} updated in slice ${stateStore.sliceForPersistenceId(updated.persistenceId)}")
        processUserUpdate(updated.value)
        
      case deleted: DeletedDurableState[UserState] =>
        println(s"User ${deleted.persistenceId} deleted")
        processUserDeletion(deleted.persistenceId)
    }
  }

// Distribute state processing across multiple slices
val sliceRanges = stateStore.sliceRanges(4)
sliceRanges.zipWithIndex.foreach { case (range, processorId) =>
  println(s"Processor $processorId handles slices ${range.start} to ${range.end}")
  
  stateStore
    .changesBySlices("User", range.start, range.end, offset)
    .runForeach { change =>
      processStateChangeInProcessor(processorId, change)
    }
}

State Processing Patterns

State Projection Building

Build read model projections from state changes:

case class UserProjection(
  persistenceId: String,
  name: String,
  email: String,
  lastUpdated: Long,
  revision: Long
)

class UserProjectionBuilder(stateStore: DurableStateStoreQuery[UserState]) {
  
  def buildProjections(): Future[Map[String, UserProjection]] = {
    stateStore
      .currentChanges("user-states", NoOffset)
      .runFold(Map.empty[String, UserProjection]) { (projections, change) =>
        change match {
          case updated: UpdatedDurableState[UserState] =>
            val projection = UserProjection(
              persistenceId = updated.persistenceId,
              name = updated.value.name,
              email = updated.value.email,
              lastUpdated = updated.timestamp,
              revision = updated.revision
            )
            projections + (updated.persistenceId -> projection)
            
          case deleted: DeletedDurableState[UserState] =>
            projections - deleted.persistenceId
        }
      }
  }
  
  def maintainProjections(initialProjections: Map[String, UserProjection]): Unit = {
    var projections = initialProjections
    var lastOffset: Offset = NoOffset
    
    stateStore
      .changes("user-states", lastOffset)
      .runForeach { change =>
        change match {
          case updated: UpdatedDurableState[UserState] =>
            val projection = UserProjection(
              persistenceId = updated.persistenceId,
              name = updated.value.name,
              email = updated.value.email,
              lastUpdated = updated.timestamp,
              revision = updated.revision
            )
            projections = projections + (updated.persistenceId -> projection)
            saveProjection(projection)
            
          case deleted: DeletedDurableState[UserState] =>
            projections = projections - deleted.persistenceId
            deleteProjection(deleted.persistenceId)
        }
        
        lastOffset = change.offset
        saveOffset(lastOffset)
      }
  }
}

State Change Filtering

Filter state changes based on criteria:

// Filter by revision numbers
stateStore
  .changes("user-states", offset)
  .filter {
    case updated: UpdatedDurableState[UserState] => updated.revision > 5L
    case deleted: DeletedDurableState[UserState] => deleted.revision > 5L
  }
  .runForeach(processImportantChange)

// Filter by timestamp (last hour only)
val oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000)

stateStore
  .changes("user-states", offset)
  .filter(_.timestamp > oneHourAgo)
  .runForeach(processRecentChange)

// Filter by state content
stateStore
  .changes("user-states", offset)
  .collect {
    case updated: UpdatedDurableState[UserState] if updated.value.isActive =>
      updated
  }
  .runForeach(processActiveUserUpdate)

State Change Aggregation

Aggregate state changes for analytics:

case class StateChangeMetrics(
  totalUpdates: Long,
  totalDeletes: Long,
  uniqueEntities: Set[String],
  lastProcessed: Long
)

def aggregateStateChanges(): Future[StateChangeMetrics] = {
  stateStore
    .currentChanges("user-states", NoOffset)
    .runFold(StateChangeMetrics(0L, 0L, Set.empty, 0L)) { (metrics, change) =>
      change match {
        case updated: UpdatedDurableState[UserState] =>
          metrics.copy(
            totalUpdates = metrics.totalUpdates + 1,
            uniqueEntities = metrics.uniqueEntities + updated.persistenceId,
            lastProcessed = math.max(metrics.lastProcessed, updated.timestamp)
          )
          
        case deleted: DeletedDurableState[UserState] =>
          metrics.copy(
            totalDeletes = metrics.totalDeletes + 1,
            uniqueEntities = metrics.uniqueEntities + deleted.persistenceId,
            lastProcessed = math.max(metrics.lastProcessed, deleted.timestamp)
          )
      }
    }
}

Error Handling

Change Processing Failures

Handle failures in state change processing:

stateStore
  .changes("user-states", offset)
  .recover {
    case ex: Exception =>
      println(s"Error in state change stream: $ex")
      // Return a placeholder or restart logic
      UpdatedDurableState("error", 0L, UserState.empty, NoOffset, System.currentTimeMillis())
  }
  .runForeach { change =>
    try {
      processStateChange(change)
    } catch {
      case ex: Exception =>
        println(s"Failed to process state change: $ex")
        handleProcessingFailure(change, ex)
    }
  }

Offset Management

Safely handle offset storage and retrieval:

class StateChangeProcessor(stateStore: DurableStateStoreQuery[UserState]) {
  private var currentOffset: Offset = loadStoredOffset().getOrElse(NoOffset)
  
  def start(): Unit = {
    stateStore
      .changes("user-states", currentOffset)
      .runForeach { change =>
        try {
          processChange(change)
          currentOffset = change.offset
          saveOffset(currentOffset)
        } catch {
          case ex: Exception =>
            println(s"Processing failed, keeping offset at $currentOffset: $ex")
            // Don't update offset on failure
        }
      }
  }
  
  def restart(): Unit = {
    // Reload offset and restart processing
    currentOffset = loadStoredOffset().getOrElse(NoOffset)
    start()
  }
}

Java API

Java API equivalents are available in the javadsl package:

import akka.persistence.query.javadsl.DurableStateStoreQuery;
import akka.persistence.query.UpdatedDurableState;
import akka.persistence.query.DeletedDurableState;

// Java API usage
DurableStateStoreQuery<UserState> stateStore = getJavaDurableStateStore();

// Query current changes
stateStore
    .currentChanges("user-states", NoOffset.getInstance())
    .runForeach(change -> {
        if (change instanceof UpdatedDurableState) {
            UpdatedDurableState<UserState> updated = (UpdatedDurableState<UserState>) change;
            System.out.println("Updated: " + updated.persistenceId());
            processUserUpdate(updated.value());
        } else if (change instanceof DeletedDurableState) {
            DeletedDurableState<UserState> deleted = (DeletedDurableState<UserState>) change;
            System.out.println("Deleted: " + deleted.persistenceId());
            processUserDeletion(deleted.persistenceId());
        }
    }, system);

Types

case class UserState(
  name: String,
  email: String,
  isActive: Boolean,
  preferences: Map[String, String]
) {
  def isEmpty: Boolean = name.isEmpty && email.isEmpty
}

object UserState {
  val empty: UserState = UserState("", "", false, Map.empty)
}

case class GetObjectResult[A](value: A, revision: Long)

trait DurableStateStore[A] {
  def getObject(persistenceId: String): Future[Option[GetObjectResult[A]]]
  def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done]
  def deleteObject(persistenceId: String): Future[Done]
}

import akka.Done
import scala.collection.immutable
import akka.stream.scaladsl.{Source, Sink}
import akka.NotUsed
import scala.concurrent.Future
import scala.util.{Success, Failure}