The typed query API provides enhanced type-safe query interfaces with improved event envelopes and slice-based querying for horizontal scaling. All typed APIs are marked as @ApiMayChange and provide both Scala and Java variants.
Query events by entity type and slice range for horizontal scaling and distributed processing.
/**
* A plugin may optionally support this query by implementing this trait.
* API May Change
*/
trait EventsBySliceQuery extends ReadJournal {
/**
* Query events for given entity type and slice range. Useful for distributing the load
* and implementing resilient query projections.
*
* @param entityType The entity type to query events for
* @param minSlice The minimum slice number (inclusive)
* @param maxSlice The maximum slice number (inclusive)
* @param offset The offset to start from
* @return Source of typed event envelopes
*/
def eventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset
): Source[EventEnvelope[Event], NotUsed]
/**
* Get the slice number for a given persistence ID.
* Useful for determining which slice a persistence ID belongs to.
*/
def sliceForPersistenceId(persistenceId: String): Int
/**
* Get slice ranges for distributing the load across multiple query processors.
*
* @param numberOfRanges Number of ranges to create
* @return Sequence of slice ranges
*/
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}Usage Examples:
import akka.persistence.query.typed.scaladsl.EventsBySliceQuery
import akka.persistence.query.typed.EventEnvelope
val readJournal: EventsBySliceQuery = getTypedReadJournal()
// Query events from specific slice range
readJournal
.eventsBySlices[UserEvent]("User", 0, 127, offset)
.runForeach { envelope =>
println(s"Event: ${envelope.event} from slice ${envelope.slice}")
println(s"Entity type: ${envelope.entityType}")
println(s"Tags: ${envelope.tags}")
}
// Distribute processing across multiple slices
val sliceRanges = readJournal.sliceRanges(4) // Create 4 ranges
sliceRanges.zipWithIndex.foreach { case (range, index) =>
println(s"Processor $index handles slices ${range.start} to ${range.end}")
readJournal
.eventsBySlices[UserEvent]("User", range.start, range.end, offset)
.runForeach { envelope =>
processEventInProcessor(index, envelope)
}
}
// Check which slice a persistence ID belongs to
val slice = readJournal.sliceForPersistenceId("user-12345")
println(s"user-12345 belongs to slice $slice")Query current (finite) events by entity type and slice range.
/**
* A plugin may optionally support this query by implementing this trait.
* API May Change
*/
trait CurrentEventsBySliceQuery extends ReadJournal {
/**
* Same as EventsBySliceQuery#eventsBySlices but the event stream
* is completed immediately when it reaches the end of currently stored events.
*/
def currentEventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset
): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}Usage Examples:
import akka.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery
val readJournal: CurrentEventsBySliceQuery = getTypedReadJournal()
// Process all current events from slice range (finite stream)
readJournal
.currentEventsBySlices[OrderEvent]("Order", 512, 1023, offset)
.runForeach { envelope =>
processHistoricalEvent(envelope.event)
}
.onComplete {
case Success(_) => println("Finished processing historical events")
case Failure(ex) => println(s"Processing failed: $ex")
}Query events for a specific persistence ID with typed envelopes.
/**
* A plugin may optionally support this query by implementing this trait.
* API May Change
*/
trait EventsByPersistenceIdTypedQuery extends ReadJournal {
/**
* Query events for a specific persistence ID with enhanced typed envelopes.
*/
def eventsByPersistenceIdTyped[Event](
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long
): Source[EventEnvelope[Event], NotUsed]
}Usage Examples:
import akka.persistence.query.typed.scaladsl.EventsByPersistenceIdTypedQuery
val readJournal: EventsByPersistenceIdTypedQuery = getTypedReadJournal()
// Query typed events for persistence ID
readJournal
.eventsByPersistenceIdTyped[UserEvent]("user-123", 0L, Long.MaxValue)
.runForeach { envelope =>
println(s"Typed event: ${envelope.event}")
println(s"Entity type: ${envelope.entityType}")
println(s"Event tags: ${envelope.tags}")
// Type-safe event processing
envelope.event match {
case UserCreated(name, email) => println(s"User $name created")
case UserUpdated(name, email) => println(s"User $name updated")
case UserDeleted(userId) => println(s"User $userId deleted")
}
}Query current events for a specific persistence ID with typed envelopes.
/**
* A plugin may optionally support this query by implementing this trait.
* API May Change
*/
trait CurrentEventsByPersistenceIdTypedQuery extends ReadJournal {
/**
* Same as EventsByPersistenceIdTypedQuery but completed when reaching
* the end of currently stored events.
*/
def currentEventsByPersistenceIdTyped[Event](
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long
): Source[EventEnvelope[Event], NotUsed]
}Query events starting from a snapshot with transformation.
/**
* A plugin may optionally support this query by implementing this trait.
* API May Change
*/
trait EventsByPersistenceIdStartingFromSnapshotQuery extends ReadJournal {
/**
* Query events starting from a snapshot. The snapshot is loaded and transformed
* to an event, then followed by events from the sequence number after the snapshot.
*/
def eventsByPersistenceIdStartingFromSnapshot[Snapshot, Event](
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
transformSnapshot: Snapshot => Event
): Source[EventEnvelope[Event], NotUsed]
}Usage Examples:
import akka.persistence.query.typed.scaladsl.EventsByPersistenceIdStartingFromSnapshotQuery
val readJournal: EventsByPersistenceIdStartingFromSnapshotQuery = getTypedReadJournal()
// Transform snapshot to event and include in stream
readJournal
.eventsByPersistenceIdStartingFromSnapshot[UserSnapshot, UserEvent](
persistenceId = "user-123",
fromSequenceNr = 0L,
toSequenceNr = Long.MaxValue,
transformSnapshot = snapshot => UserStateRestored(snapshot.name, snapshot.email, snapshot.preferences)
)
.runForeach { envelope =>
envelope.event match {
case UserStateRestored(name, email, prefs) =>
println(s"Restored user state: $name")
case otherEvent =>
println(s"Regular event: $otherEvent")
}
}Query events by slice starting from snapshots with transformation.
/**
* A plugin may optionally support this query by implementing this trait.
* API May Change
*/
trait EventsBySliceStartingFromSnapshotsQuery extends ReadJournal {
/**
* Query events by slice starting from snapshots. Snapshots are loaded and transformed
* to events, then followed by events from the sequence number after the snapshot.
*/
def eventsBySlicesStartingFromSnapshots[Snapshot, Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset,
transformSnapshot: Snapshot => Event
): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}Usage Examples:
import akka.persistence.query.typed.scaladsl.EventsBySliceStartingFromSnapshotsQuery
val readJournal: EventsBySliceStartingFromSnapshotsQuery = getTypedReadJournal()
// Query slice events starting from snapshots
readJournal
.eventsBySlicesStartingFromSnapshots[OrderSnapshot, OrderEvent](
entityType = "Order",
minSlice = 0,
maxSlice = 255,
offset = offset,
transformSnapshot = snapshot => OrderStateRestored(
orderId = snapshot.orderId,
items = snapshot.items,
total = snapshot.total,
status = snapshot.status
)
)
.runForeach { envelope =>
envelope.event match {
case OrderStateRestored(orderId, items, total, status) =>
println(s"Restored order $orderId with $total")
case regularEvent =>
processOrderEvent(regularEvent)
}
}Load individual events on demand.
/**
* A plugin may optionally support this query by implementing this trait.
* API May Change
*/
trait LoadEventQuery extends ReadJournal {
/**
* Load a specific event envelope by persistence ID and sequence number.
* Useful for loading events that were not included in the original query result.
*/
def loadEnvelope[Event](
persistenceId: String,
sequenceNr: Long
): Future[EventEnvelope[Event]]
}Usage Examples:
import akka.persistence.query.typed.scaladsl.LoadEventQuery
val readJournal: LoadEventQuery = getTypedReadJournal()
// Load specific event on demand
def processEnvelopeWithLoading[Event](envelope: EventEnvelope[Event]): Future[Unit] = {
envelope.eventOption match {
case Some(event) =>
Future.successful(processEvent(event))
case None if !envelope.filtered =>
// Event not loaded, load it on demand
readJournal
.loadEnvelope[Event](envelope.persistenceId, envelope.sequenceNr)
.map(loadedEnvelope => processEvent(loadedEnvelope.event))
case None =>
// Event was filtered, skip processing
Future.successful(())
}
}
// Use with slice queries
readJournal
.asInstanceOf[EventsBySliceQuery]
.eventsBySlices[UserEvent]("User", 0, 127, offset)
.mapAsync(4)(processEnvelopeWithLoading)
.runWith(Sink.ignore)Query timestamps for specific events.
/**
* A plugin may optionally support this query by implementing this trait.
* API May Change
*/
trait EventTimestampQuery extends ReadJournal {
/**
* Get the timestamp of a specific event by persistence ID and sequence number.
* Returns None if the event doesn't exist.
*/
def timestampOf(
persistenceId: String,
sequenceNr: Long
): Future[Option[Instant]]
}Usage Examples:
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import java.time.Instant
val readJournal: EventTimestampQuery = getTypedReadJournal()
// Get timestamp of specific event
readJournal
.timestampOf("user-123", 42L)
.foreach {
case Some(timestamp) =>
println(s"Event 42 was stored at: $timestamp")
case None =>
println("Event 42 not found")
}
// Check event age
def checkEventAge(persistenceId: String, sequenceNr: Long): Future[Unit] = {
readJournal
.timestampOf(persistenceId, sequenceNr)
.map {
case Some(timestamp) =>
val age = java.time.Duration.between(timestamp, Instant.now())
if (age.toDays > 30) {
println(s"Event is ${age.toDays} days old")
}
case None =>
println("Event not found")
}
}Use slices to distribute event processing across multiple nodes:
def startSliceProcessor(processorId: Int, totalProcessors: Int): Unit = {
val readJournal: EventsBySliceQuery = getTypedReadJournal()
// Calculate slice range for this processor
val sliceRanges = readJournal.sliceRanges(totalProcessors)
val myRange = sliceRanges(processorId)
println(s"Processor $processorId handling slices ${myRange.start} to ${myRange.end}")
readJournal
.eventsBySlices[DomainEvent]("Entity", myRange.start, myRange.end, offset)
.runForeach { envelope =>
// Process events for this slice range
processEvent(envelope.event)
// Update offset for this processor
saveProcessorOffset(processorId, envelope.offset)
}
}
// Start 8 processors
(0 until 8).foreach(startSliceProcessor(_, 8))Process different entity types with separate processors:
val entityTypes = List("User", "Order", "Payment", "Inventory")
entityTypes.foreach { entityType =>
readJournal
.eventsBySlices[DomainEvent](entityType, 0, 1023, offset)
.runForeach { envelope =>
entityType match {
case "User" => processUserEvent(envelope.event)
case "Order" => processOrderEvent(envelope.event)
case "Payment" => processPaymentEvent(envelope.event)
case "Inventory" => processInventoryEvent(envelope.event)
}
}
}Combine snapshots with event streams for efficient state reconstruction:
def buildProjectionFromSnapshots[State, Event](
entityType: String,
slice: Int,
initialState: State,
applyEvent: (State, Event) => State,
applySnapshot: UserSnapshot => State
): Future[State] = {
readJournal
.eventsBySlicesStartingFromSnapshots[UserSnapshot, Event](
entityType = entityType,
minSlice = slice,
maxSlice = slice,
offset = TimestampOffset.Zero,
transformSnapshot = snapshot => SnapshotRestored(snapshot).asInstanceOf[Event]
)
.runFold(initialState) { (state, envelope) =>
envelope.event match {
case SnapshotRestored(snapshot) =>
applySnapshot(snapshot.asInstanceOf[UserSnapshot])
case event =>
applyEvent(state, event)
}
}
}All typed Scala query traits have corresponding Java API equivalents:
import akka.persistence.query.typed.javadsl.*;
import java.util.concurrent.CompletionStage;
import java.util.List;
import akka.japi.Pair;
// Java API usage
EventsBySliceQuery readJournal = getJavaTypedReadJournal();
// Query events by slice
readJournal
.eventsBySlices(UserEvent.class, "User", 0, 127, offset)
.runForeach(envelope -> {
System.out.println("Event: " + envelope.getEvent());
System.out.println("Entity type: " + envelope.entityType());
System.out.println("Slice: " + envelope.slice());
}, system);
// Get slice ranges for distribution
List<Pair<Integer, Integer>> ranges = readJournal.sliceRanges(4);
ranges.forEach(range -> {
System.out.println("Range: " + range.first() + " to " + range.second());
});
// Load event on demand
LoadEventQuery loadQuery = (LoadEventQuery) readJournal;
CompletionStage<EventEnvelope<UserEvent>> future =
loadQuery.loadEnvelope(UserEvent.class, "user-123", 42L);Handle failures when loading events on demand:
def safeLoadEvent[Event](
persistenceId: String,
sequenceNr: Long
): Future[Option[Event]] = {
readJournal
.asInstanceOf[LoadEventQuery]
.loadEnvelope[Event](persistenceId, sequenceNr)
.map(envelope => Some(envelope.event))
.recover {
case _: NoSuchElementException => None
case ex =>
println(s"Failed to load event: $ex")
None
}
}Handle failures in slice-based processing:
def resilientSliceProcessing(slice: Int): Unit = {
readJournal
.eventsBySlices[DomainEvent]("Entity", slice, slice, offset)
.recover {
case ex: Exception =>
println(s"Error in slice $slice: $ex")
// Return empty envelope or restart logic
EventEnvelope.empty
}
.runForeach { envelope =>
try {
processEvent(envelope.event)
} catch {
case ex: Exception =>
println(s"Failed to process event in slice $slice: $ex")
}
}
}case class UserEvent(userId: String, eventType: String, data: Map[String, Any])
case class OrderEvent(orderId: String, eventType: String, data: Map[String, Any])
case class DomainEvent(entityId: String, eventType: String, payload: Any)
case class UserCreated(name: String, email: String)
case class UserUpdated(name: String, email: String)
case class UserDeleted(userId: String)
case class UserStateRestored(name: String, email: String, preferences: Map[String, String])
case class OrderStateRestored(orderId: String, items: List[String], total: BigDecimal, status: String)
case class SnapshotRestored[T](snapshot: T)
case class UserSnapshot(name: String, email: String, preferences: Map[String, String])
case class OrderSnapshot(orderId: String, items: List[String], total: BigDecimal, status: String)
import scala.concurrent.Future
import java.time.Instant
import akka.stream.scaladsl.{Source, Sink}
import akka.NotUsed
import scala.collection.immutable