The untyped (original) query API provides standardized interfaces for querying persisted events and persistence IDs. These traits define optional capabilities that read journal implementations may support.
Base marker trait for all Scala read journal implementations.
/**
* API for reading persistent events and information derived from stored persistent events.
*
* The purpose of the API is not to enforce compatibility between different
* journal implementations, because the technical capabilities may be very different.
* The interface is very open so that different journals may implement specific queries.
*/
trait ReadJournalQuery events for a specific persistent actor by its persistence ID.
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait EventsByPersistenceIdQuery extends ReadJournal {
/**
* Query events for a specific PersistentActor identified by persistenceId.
*
* You can retrieve a subset of all events by specifying fromSequenceNr and toSequenceNr
* or use 0L and Long.MaxValue respectively to retrieve all events. The query will
* return all the events inclusive of the fromSequenceNr and toSequenceNr values.
*
* The returned event stream should be ordered by sequence number.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
*/
def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long
): Source[EventEnvelope, NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
import akka.stream.scaladsl.Sink
// Implement the query trait
class MyReadJournal extends ReadJournal with EventsByPersistenceIdQuery {
def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long
): Source[EventEnvelope, NotUsed] = {
// Implementation specific logic
Source.empty
}
}
// Use the query
val readJournal: EventsByPersistenceIdQuery = getMyReadJournal()
// Query all events for a persistence ID (live stream)
readJournal
.eventsByPersistenceId("user-123", 0L, Long.MaxValue)
.runWith(Sink.foreach { envelope =>
println(s"Event: ${envelope.event} at sequence ${envelope.sequenceNr}")
})
// Query specific sequence number range
readJournal
.eventsByPersistenceId("order-456", 10L, 50L)
.runForeach { envelope =>
processEvent(envelope.event)
}Query current (finite) events for a specific persistent actor.
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait CurrentEventsByPersistenceIdQuery extends ReadJournal {
/**
* Same type of query as EventsByPersistenceIdQuery#eventsByPersistenceId but the event stream
* is completed immediately when it reaches the end of the currently stored events.
*/
def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long
): Source[EventEnvelope, NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
// Query current events (finite stream)
val readJournal: CurrentEventsByPersistenceIdQuery = getMyReadJournal()
readJournal
.currentEventsByPersistenceId("user-123", 0L, Long.MaxValue)
.runForeach { envelope =>
println(s"Historical event: ${envelope.event}")
}
.onComplete {
case Success(_) => println("Finished processing historical events")
case Failure(ex) => println(s"Failed: $ex")
}Query events that have a specific tag across all persistence IDs.
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait EventsByTagQuery extends ReadJournal {
/**
* Query events that have a specific tag. A tag can for example correspond to an
* aggregate root type (in DDD terminology).
*
* The consumer can keep track of its current position in the event stream by storing the
* offset and restart the query from a given offset after a crash/restart.
*
* The returned event stream should be ordered by offset if possible, but this can also be
* difficult to fulfill for a distributed data store. The order must be documented by the
* read journal plugin.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
*/
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.EventsByTagQuery
import akka.persistence.query.{NoOffset, Sequence}
val readJournal: EventsByTagQuery = getMyReadJournal()
// Query events by tag from beginning (live stream)
readJournal
.eventsByTag("user-events", NoOffset)
.runForeach { envelope =>
println(s"Tagged event: ${envelope.event} from ${envelope.persistenceId}")
}
// Resume from specific offset
val lastProcessedOffset = Sequence(1000L)
readJournal
.eventsByTag("order-events", lastProcessedOffset)
.runForeach { envelope =>
processTaggedEvent(envelope)
saveOffset(envelope.offset) // Save for resumption
}Query current (finite) events that have a specific tag.
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait CurrentEventsByTagQuery extends ReadJournal {
/**
* Same type of query as EventsByTagQuery#eventsByTag but the event stream
* is completed immediately when it reaches the end of the currently stored events.
*/
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.CurrentEventsByTagQuery
val readJournal: CurrentEventsByTagQuery = getMyReadJournal()
// Process all current events with tag (finite stream)
readJournal
.currentEventsByTag("batch-process", NoOffset)
.runForeach { envelope =>
processBatchEvent(envelope.event)
}
.onComplete {
case Success(_) => println("Batch processing complete")
case Failure(ex) => println(s"Batch processing failed: $ex")
}Query all persistence IDs in the journal.
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait PersistenceIdsQuery extends ReadJournal {
/**
* Query all PersistentActor identifiers, i.e. as defined by the
* persistenceId of the PersistentActor.
*
* The stream is not completed when it reaches the end of the currently used persistenceIds,
* but it continues to push new persistenceIds when new persistent actors are created.
*/
def persistenceIds(): Source[String, NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.PersistenceIdsQuery
val readJournal: PersistenceIdsQuery = getMyReadJournal()
// Get all persistence IDs (live stream)
readJournal
.persistenceIds()
.runForeach { persistenceId =>
println(s"Found persistence ID: $persistenceId")
// Query events for each persistence ID
readJournal
.asInstanceOf[EventsByPersistenceIdQuery]
.eventsByPersistenceId(persistenceId, 0L, Long.MaxValue)
.take(10) // Limit to first 10 events
.runForeach { envelope =>
println(s" Event: ${envelope.event}")
}
}Query all current (finite) persistence IDs in the journal.
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait CurrentPersistenceIdsQuery extends ReadJournal {
/**
* Same type of query as PersistenceIdsQuery#persistenceIds but the stream
* is completed immediately when it reaches the end of the currently used persistenceIds.
*/
def currentPersistenceIds(): Source[String, NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery
val readJournal: CurrentPersistenceIdsQuery = getMyReadJournal()
// Get all current persistence IDs (finite stream)
readJournal
.currentPersistenceIds()
.runForeach { persistenceId =>
println(s"Current persistence ID: $persistenceId")
}
.onComplete {
case Success(_) => println("Finished scanning persistence IDs")
case Failure(ex) => println(s"Scan failed: $ex")
}Query persistence IDs with pagination support for large datasets.
/**
* A plugin ReadJournal may optionally support this query by implementing this trait.
*/
trait PagedPersistenceIdsQuery extends ReadJournal {
/**
* Get the current persistence ids.
*
* Not all plugins may support in database paging, and may simply use drop/take Akka streams operators
* to manipulate the result set according to the paging parameters.
*
* @param afterId The ID to start returning results from, or None to return all ids. This should be an id
* returned from a previous invocation of this command. Callers should not assume that ids are
* returned in sorted order.
* @param limit The maximum results to return. Use Long.MaxValue to return all results. Must be greater than zero.
* @return A source containing all the persistence ids, limited as specified.
*/
def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]
}Usage Examples:
import akka.persistence.query.scaladsl.PagedPersistenceIdsQuery
val readJournal: PagedPersistenceIdsQuery = getMyReadJournal()
// Get first page of persistence IDs
readJournal
.currentPersistenceIds(None, 100)
.runFold(List.empty[String])(_ :+ _)
.map { firstPage =>
println(s"First page: ${firstPage.mkString(", ")}")
// Get next page starting from last ID
if (firstPage.nonEmpty) {
val lastId = firstPage.last
readJournal
.currentPersistenceIds(Some(lastId), 100)
.runForeach { nextId =>
println(s"Next page ID: $nextId")
}
}
}
// Get all persistence IDs with unlimited results
readJournal
.currentPersistenceIds(None, Long.MaxValue)
.runForeach { persistenceId =>
println(s"Persistence ID: $persistenceId")
}import akka.NotUsed
import akka.stream.scaladsl.Source
trait Product4[T1, T2, T3, T4] {
def _1: T1
def _2: T2
def _3: T3
def _4: T4
}
case object NotUsedCombine multiple query capabilities in a single read journal:
class ComprehensiveReadJournal extends ReadJournal
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery
with EventsByTagQuery
with CurrentEventsByTagQuery
with PersistenceIdsQuery
with CurrentPersistenceIdsQuery {
// Implement all query methods
def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long) = ???
def currentEventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long) = ???
def eventsByTag(tag: String, offset: Offset) = ???
def currentEventsByTag(tag: String, offset: Offset) = ???
def persistenceIds() = ???
def currentPersistenceIds() = ???
}Check if a read journal supports specific query capabilities:
def checkCapabilities(readJournal: ReadJournal): Unit = {
readJournal match {
case _: EventsByPersistenceIdQuery =>
println("Supports events by persistence ID queries")
case _ =>
println("Does not support events by persistence ID queries")
}
readJournal match {
case _: EventsByTagQuery =>
println("Supports events by tag queries")
case _ =>
println("Does not support events by tag queries")
}
readJournal match {
case _: PersistenceIdsQuery =>
println("Supports persistence IDs queries")
case _ =>
println("Does not support persistence IDs queries")
}
}Handle failures in query streams:
readJournal
.eventsByTag("user-events", offset)
.recover {
case _: TimeoutException =>
println("Query timed out, restarting...")
EventEnvelope.empty // Placeholder
}
.runForeach { envelope =>
try {
processEvent(envelope.event)
} catch {
case ex: Exception =>
println(s"Failed to process event: $ex")
}
}Validate offsets before using in queries:
def validateOffset(offset: Offset): Boolean = {
offset match {
case NoOffset => true
case Sequence(value) => value >= 0
case TimestampOffset(timestamp, _, _) => !timestamp.isBefore(Instant.EPOCH)
case _ => false
}
}
def safeEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
if (validateOffset(offset)) {
readJournal.eventsByTag(tag, offset)
} else {
Source.failed(new IllegalArgumentException(s"Invalid offset: $offset"))
}
}All Scala query traits have corresponding Java API equivalents in the javadsl package:
// Java API usage
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.javadsl.*;
// Get Java read journal
ReadJournal readJournal = PersistenceQuery.get(system)
.getReadJournalFor(MyJavaReadJournal.class, "my-journal");
// Check capabilities and use
if (readJournal instanceof EventsByPersistenceIdQuery) {
EventsByPersistenceIdQuery query = (EventsByPersistenceIdQuery) readJournal;
query.eventsByPersistenceId("user-123", 0L, Long.MAX_VALUE)
.runForeach(envelope -> {
System.out.println("Event: " + envelope.event());
}, system);
}import akka.stream.scaladsl.Source
import akka.NotUsed
import scala.concurrent.Future
import scala.util.{Success, Failure}
trait TimeoutException extends Exception