or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md
tile.json

typed-queries.mddocs/

Typed Query API

The typed query API provides enhanced type-safe query interfaces with improved event envelopes and slice-based querying for horizontal scaling. All typed APIs are marked as @ApiMayChange and provide both Scala and Java variants.

Capabilities

Events by Slice Query

Query events by entity type and slice range for horizontal scaling and distributed processing.

/**
 * A plugin may optionally support this query by implementing this trait.
 * API May Change
 */
trait EventsBySliceQuery extends ReadJournal {
  /**
   * Query events for given entity type and slice range. Useful for distributing the load
   * and implementing resilient query projections.
   *
   * @param entityType The entity type to query events for
   * @param minSlice The minimum slice number (inclusive)
   * @param maxSlice The maximum slice number (inclusive)  
   * @param offset The offset to start from
   * @return Source of typed event envelopes
   */
  def eventsBySlices[Event](
    entityType: String,
    minSlice: Int,
    maxSlice: Int,
    offset: Offset
  ): Source[EventEnvelope[Event], NotUsed]
  
  /**
   * Get the slice number for a given persistence ID.
   * Useful for determining which slice a persistence ID belongs to.
   */
  def sliceForPersistenceId(persistenceId: String): Int
  
  /**
   * Get slice ranges for distributing the load across multiple query processors.
   * 
   * @param numberOfRanges Number of ranges to create
   * @return Sequence of slice ranges
   */
  def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

Usage Examples:

import akka.persistence.query.typed.scaladsl.EventsBySliceQuery
import akka.persistence.query.typed.EventEnvelope

val readJournal: EventsBySliceQuery = getTypedReadJournal()

// Query events from specific slice range
readJournal
  .eventsBySlices[UserEvent]("User", 0, 127, offset)
  .runForeach { envelope =>
    println(s"Event: ${envelope.event} from slice ${envelope.slice}")
    println(s"Entity type: ${envelope.entityType}")
    println(s"Tags: ${envelope.tags}")
  }

// Distribute processing across multiple slices
val sliceRanges = readJournal.sliceRanges(4) // Create 4 ranges
sliceRanges.zipWithIndex.foreach { case (range, index) =>
  println(s"Processor $index handles slices ${range.start} to ${range.end}")
  
  readJournal
    .eventsBySlices[UserEvent]("User", range.start, range.end, offset)
    .runForeach { envelope =>
      processEventInProcessor(index, envelope)
    }
}

// Check which slice a persistence ID belongs to
val slice = readJournal.sliceForPersistenceId("user-12345")
println(s"user-12345 belongs to slice $slice")

Current Events by Slice Query

Query current (finite) events by entity type and slice range.

/**
 * A plugin may optionally support this query by implementing this trait.
 * API May Change
 */
trait CurrentEventsBySliceQuery extends ReadJournal {
  /**
   * Same as EventsBySliceQuery#eventsBySlices but the event stream
   * is completed immediately when it reaches the end of currently stored events.
   */
  def currentEventsBySlices[Event](
    entityType: String,
    minSlice: Int,
    maxSlice: Int,
    offset: Offset
  ): Source[EventEnvelope[Event], NotUsed]
  
  def sliceForPersistenceId(persistenceId: String): Int
  def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

Usage Examples:

import akka.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery

val readJournal: CurrentEventsBySliceQuery = getTypedReadJournal()

// Process all current events from slice range (finite stream)
readJournal
  .currentEventsBySlices[OrderEvent]("Order", 512, 1023, offset)
  .runForeach { envelope =>
    processHistoricalEvent(envelope.event)
  }
  .onComplete {
    case Success(_) => println("Finished processing historical events")
    case Failure(ex) => println(s"Processing failed: $ex")
  }

Events by Persistence ID Typed Query

Query events for a specific persistence ID with typed envelopes.

/**
 * A plugin may optionally support this query by implementing this trait.
 * API May Change
 */
trait EventsByPersistenceIdTypedQuery extends ReadJournal {
  /**
   * Query events for a specific persistence ID with enhanced typed envelopes.
   */
  def eventsByPersistenceIdTyped[Event](
    persistenceId: String,
    fromSequenceNr: Long,
    toSequenceNr: Long
  ): Source[EventEnvelope[Event], NotUsed]
}

Usage Examples:

import akka.persistence.query.typed.scaladsl.EventsByPersistenceIdTypedQuery

val readJournal: EventsByPersistenceIdTypedQuery = getTypedReadJournal()

// Query typed events for persistence ID
readJournal
  .eventsByPersistenceIdTyped[UserEvent]("user-123", 0L, Long.MaxValue)
  .runForeach { envelope =>
    println(s"Typed event: ${envelope.event}")
    println(s"Entity type: ${envelope.entityType}")
    println(s"Event tags: ${envelope.tags}")
    
    // Type-safe event processing
    envelope.event match {
      case UserCreated(name, email) => println(s"User $name created")
      case UserUpdated(name, email) => println(s"User $name updated") 
      case UserDeleted(userId) => println(s"User $userId deleted")
    }
  }

Current Events by Persistence ID Typed Query

Query current events for a specific persistence ID with typed envelopes.

/**
 * A plugin may optionally support this query by implementing this trait.
 * API May Change
 */
trait CurrentEventsByPersistenceIdTypedQuery extends ReadJournal {
  /**
   * Same as EventsByPersistenceIdTypedQuery but completed when reaching
   * the end of currently stored events.
   */
  def currentEventsByPersistenceIdTyped[Event](
    persistenceId: String,
    fromSequenceNr: Long,
    toSequenceNr: Long
  ): Source[EventEnvelope[Event], NotUsed]
}

Events by Persistence ID Starting from Snapshot

Query events starting from a snapshot with transformation.

/**
 * A plugin may optionally support this query by implementing this trait.
 * API May Change
 */
trait EventsByPersistenceIdStartingFromSnapshotQuery extends ReadJournal {
  /**
   * Query events starting from a snapshot. The snapshot is loaded and transformed
   * to an event, then followed by events from the sequence number after the snapshot.
   */
  def eventsByPersistenceIdStartingFromSnapshot[Snapshot, Event](
    persistenceId: String,
    fromSequenceNr: Long,
    toSequenceNr: Long,
    transformSnapshot: Snapshot => Event
  ): Source[EventEnvelope[Event], NotUsed]
}

Usage Examples:

import akka.persistence.query.typed.scaladsl.EventsByPersistenceIdStartingFromSnapshotQuery

val readJournal: EventsByPersistenceIdStartingFromSnapshotQuery = getTypedReadJournal()

// Transform snapshot to event and include in stream
readJournal
  .eventsByPersistenceIdStartingFromSnapshot[UserSnapshot, UserEvent](
    persistenceId = "user-123",
    fromSequenceNr = 0L,
    toSequenceNr = Long.MaxValue,
    transformSnapshot = snapshot => UserStateRestored(snapshot.name, snapshot.email, snapshot.preferences)
  )
  .runForeach { envelope =>
    envelope.event match {
      case UserStateRestored(name, email, prefs) => 
        println(s"Restored user state: $name")
      case otherEvent => 
        println(s"Regular event: $otherEvent")
    }
  }

Events by Slice Starting from Snapshots

Query events by slice starting from snapshots with transformation.

/**
 * A plugin may optionally support this query by implementing this trait.
 * API May Change
 */
trait EventsBySliceStartingFromSnapshotsQuery extends ReadJournal {
  /**
   * Query events by slice starting from snapshots. Snapshots are loaded and transformed
   * to events, then followed by events from the sequence number after the snapshot.
   */
  def eventsBySlicesStartingFromSnapshots[Snapshot, Event](
    entityType: String,
    minSlice: Int,
    maxSlice: Int,
    offset: Offset,
    transformSnapshot: Snapshot => Event
  ): Source[EventEnvelope[Event], NotUsed]
  
  def sliceForPersistenceId(persistenceId: String): Int
  def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

Usage Examples:

import akka.persistence.query.typed.scaladsl.EventsBySliceStartingFromSnapshotsQuery

val readJournal: EventsBySliceStartingFromSnapshotsQuery = getTypedReadJournal()

// Query slice events starting from snapshots
readJournal
  .eventsBySlicesStartingFromSnapshots[OrderSnapshot, OrderEvent](
    entityType = "Order",
    minSlice = 0,
    maxSlice = 255,
    offset = offset,
    transformSnapshot = snapshot => OrderStateRestored(
      orderId = snapshot.orderId,
      items = snapshot.items,
      total = snapshot.total,
      status = snapshot.status
    )
  )
  .runForeach { envelope =>
    envelope.event match {
      case OrderStateRestored(orderId, items, total, status) =>
        println(s"Restored order $orderId with $total")
      case regularEvent =>
        processOrderEvent(regularEvent)
    }
  }

Load Event Query

Load individual events on demand.

/**
 * A plugin may optionally support this query by implementing this trait.
 * API May Change
 */
trait LoadEventQuery extends ReadJournal {
  /**
   * Load a specific event envelope by persistence ID and sequence number.
   * Useful for loading events that were not included in the original query result.
   */
  def loadEnvelope[Event](
    persistenceId: String,
    sequenceNr: Long
  ): Future[EventEnvelope[Event]]
}

Usage Examples:

import akka.persistence.query.typed.scaladsl.LoadEventQuery

val readJournal: LoadEventQuery = getTypedReadJournal()

// Load specific event on demand
def processEnvelopeWithLoading[Event](envelope: EventEnvelope[Event]): Future[Unit] = {
  envelope.eventOption match {
    case Some(event) => 
      Future.successful(processEvent(event))
    case None if !envelope.filtered =>
      // Event not loaded, load it on demand
      readJournal
        .loadEnvelope[Event](envelope.persistenceId, envelope.sequenceNr)
        .map(loadedEnvelope => processEvent(loadedEnvelope.event))
    case None =>
      // Event was filtered, skip processing
      Future.successful(())
  }
}

// Use with slice queries
readJournal
  .asInstanceOf[EventsBySliceQuery]
  .eventsBySlices[UserEvent]("User", 0, 127, offset)
  .mapAsync(4)(processEnvelopeWithLoading)
  .runWith(Sink.ignore)

Event Timestamp Query

Query timestamps for specific events.

/**
 * A plugin may optionally support this query by implementing this trait.
 * API May Change
 */
trait EventTimestampQuery extends ReadJournal {
  /**
   * Get the timestamp of a specific event by persistence ID and sequence number.
   * Returns None if the event doesn't exist.
   */
  def timestampOf(
    persistenceId: String,
    sequenceNr: Long
  ): Future[Option[Instant]]
}

Usage Examples:

import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import java.time.Instant

val readJournal: EventTimestampQuery = getTypedReadJournal()

// Get timestamp of specific event
readJournal
  .timestampOf("user-123", 42L)
  .foreach {
    case Some(timestamp) => 
      println(s"Event 42 was stored at: $timestamp")
    case None => 
      println("Event 42 not found")
  }

// Check event age
def checkEventAge(persistenceId: String, sequenceNr: Long): Future[Unit] = {
  readJournal
    .timestampOf(persistenceId, sequenceNr)
    .map {
      case Some(timestamp) =>
        val age = java.time.Duration.between(timestamp, Instant.now())
        if (age.toDays > 30) {
          println(s"Event is ${age.toDays} days old")
        }
      case None =>
        println("Event not found")
    }
}

Slice-Based Processing Patterns

Distributed Event Processing

Use slices to distribute event processing across multiple nodes:

def startSliceProcessor(processorId: Int, totalProcessors: Int): Unit = {
  val readJournal: EventsBySliceQuery = getTypedReadJournal()
  
  // Calculate slice range for this processor
  val sliceRanges = readJournal.sliceRanges(totalProcessors)
  val myRange = sliceRanges(processorId)
  
  println(s"Processor $processorId handling slices ${myRange.start} to ${myRange.end}")
  
  readJournal
    .eventsBySlices[DomainEvent]("Entity", myRange.start, myRange.end, offset)
    .runForeach { envelope =>
      // Process events for this slice range
      processEvent(envelope.event)
      
      // Update offset for this processor
      saveProcessorOffset(processorId, envelope.offset)
    }
}

// Start 8 processors
(0 until 8).foreach(startSliceProcessor(_, 8))

Entity Type Filtering

Process different entity types with separate processors:

val entityTypes = List("User", "Order", "Payment", "Inventory")

entityTypes.foreach { entityType =>
  readJournal
    .eventsBySlices[DomainEvent](entityType, 0, 1023, offset)
    .runForeach { envelope =>
      entityType match {
        case "User" => processUserEvent(envelope.event)
        case "Order" => processOrderEvent(envelope.event)
        case "Payment" => processPaymentEvent(envelope.event)
        case "Inventory" => processInventoryEvent(envelope.event)
      }
    }
}

Snapshot Integration

Combine snapshots with event streams for efficient state reconstruction:

def buildProjectionFromSnapshots[State, Event](
  entityType: String,
  slice: Int,
  initialState: State,
  applyEvent: (State, Event) => State,
  applySnapshot: UserSnapshot => State
): Future[State] = {
  
  readJournal
    .eventsBySlicesStartingFromSnapshots[UserSnapshot, Event](
      entityType = entityType,
      minSlice = slice,
      maxSlice = slice,
      offset = TimestampOffset.Zero,
      transformSnapshot = snapshot => SnapshotRestored(snapshot).asInstanceOf[Event]
    )
    .runFold(initialState) { (state, envelope) =>
      envelope.event match {
        case SnapshotRestored(snapshot) => 
          applySnapshot(snapshot.asInstanceOf[UserSnapshot])
        case event => 
          applyEvent(state, event)
      }
    }
}

Java API

All typed Scala query traits have corresponding Java API equivalents:

import akka.persistence.query.typed.javadsl.*;
import java.util.concurrent.CompletionStage;
import java.util.List;
import akka.japi.Pair;

// Java API usage
EventsBySliceQuery readJournal = getJavaTypedReadJournal();

// Query events by slice
readJournal
    .eventsBySlices(UserEvent.class, "User", 0, 127, offset)
    .runForeach(envelope -> {
        System.out.println("Event: " + envelope.getEvent());
        System.out.println("Entity type: " + envelope.entityType());
        System.out.println("Slice: " + envelope.slice());
    }, system);

// Get slice ranges for distribution
List<Pair<Integer, Integer>> ranges = readJournal.sliceRanges(4);
ranges.forEach(range -> {
    System.out.println("Range: " + range.first() + " to " + range.second());
});

// Load event on demand
LoadEventQuery loadQuery = (LoadEventQuery) readJournal;
CompletionStage<EventEnvelope<UserEvent>> future = 
    loadQuery.loadEnvelope(UserEvent.class, "user-123", 42L);

Error Handling

Event Loading Failures

Handle failures when loading events on demand:

def safeLoadEvent[Event](
  persistenceId: String,
  sequenceNr: Long
): Future[Option[Event]] = {
  readJournal
    .asInstanceOf[LoadEventQuery]
    .loadEnvelope[Event](persistenceId, sequenceNr)
    .map(envelope => Some(envelope.event))
    .recover {
      case _: NoSuchElementException => None
      case ex => 
        println(s"Failed to load event: $ex")
        None
    }
}

Slice Processing Failures

Handle failures in slice-based processing:

def resilientSliceProcessing(slice: Int): Unit = {
  readJournal
    .eventsBySlices[DomainEvent]("Entity", slice, slice, offset)
    .recover {
      case ex: Exception =>
        println(s"Error in slice $slice: $ex")
        // Return empty envelope or restart logic
        EventEnvelope.empty
    }
    .runForeach { envelope =>
      try {
        processEvent(envelope.event)
      } catch {
        case ex: Exception =>
          println(s"Failed to process event in slice $slice: $ex")
      }
    }
}

Types

case class UserEvent(userId: String, eventType: String, data: Map[String, Any])
case class OrderEvent(orderId: String, eventType: String, data: Map[String, Any])
case class DomainEvent(entityId: String, eventType: String, payload: Any)

case class UserCreated(name: String, email: String)
case class UserUpdated(name: String, email: String)
case class UserDeleted(userId: String)
case class UserStateRestored(name: String, email: String, preferences: Map[String, String])

case class OrderStateRestored(orderId: String, items: List[String], total: BigDecimal, status: String)
case class SnapshotRestored[T](snapshot: T)

case class UserSnapshot(name: String, email: String, preferences: Map[String, String])
case class OrderSnapshot(orderId: String, items: List[String], total: BigDecimal, status: String)

import scala.concurrent.Future
import java.time.Instant
import akka.stream.scaladsl.{Source, Sink}
import akka.NotUsed
import scala.collection.immutable