Universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends with support for CQRS and event sourcing patterns.
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-persistence-query_2-13@2.8.0Akka Persistence Query provides a universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends. It enables building reactive applications, CQRS systems, and event processors that can process persistent event streams with high throughput and reliability.
libraryDependencies += "com.typesafe.akka" %% "akka-persistence-query" % "2.8.8"import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.ReadJournal
import akka.persistence.query.{EventEnvelope, Offset}Java API:
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.javadsl.ReadJournal;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.Offset;import akka.actor.ActorSystem
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.stream.scaladsl.Sink
implicit val system: ActorSystem = ActorSystem("example")
// Get read journal for querying
val readJournal = PersistenceQuery(system)
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
// Query events by persistence ID
readJournal
.eventsByPersistenceId("user-123", 0L, Long.MaxValue)
.runWith(Sink.foreach { envelope =>
println(s"Event: ${envelope.event} at ${envelope.sequenceNr}")
})
// Query events by tag
readJournal
.eventsByTag("user-events", Offset.noOffset)
.runWith(Sink.foreach { envelope =>
println(s"Tagged event: ${envelope.event}")
})Akka Persistence Query is built around several key components:
Main extension entry point for obtaining read journal instances from configured plugins.
object PersistenceQuery extends ExtensionId[PersistenceQuery] {
def get(system: ActorSystem): PersistenceQuery
def get(system: ClassicActorSystemProvider): PersistenceQuery
}
class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T
def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T
def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T
def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T
}Position tracking system for resumable queries and stream consumption.
abstract class Offset
case class Sequence(value: Long) extends Offset with Ordered[Sequence]
case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID]
case class TimestampOffset(timestamp: Instant, readTimestamp: Instant, seen: Map[String, Long]) extends Offset
case object NoOffset extends Offset
object Offset {
def noOffset: Offset
def sequence(value: Long): Offset
def timeBasedUUID(uuid: UUID): Offset
def timestamp(instant: Instant): TimestampOffset
}Event wrapper classes providing metadata for streamed events.
final class EventEnvelope(
val offset: Offset,
val persistenceId: String,
val sequenceNr: Long,
val event: Any,
val timestamp: Long,
val eventMetadata: Option[Any]
) extends Product4[Offset, String, Long, Any]Original query interface for event and persistence ID queries.
trait ReadJournal
trait EventsByPersistenceIdQuery extends ReadJournal {
def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed]
}
trait EventsByTagQuery extends ReadJournal {
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}
trait PersistenceIdsQuery extends ReadJournal {
def persistenceIds(): Source[String, NotUsed]
}Enhanced type-safe query interface with improved event envelopes and slice-based querying.
final class EventEnvelope[Event](
val offset: Offset,
val persistenceId: String,
val sequenceNr: Long,
val eventOption: Option[Event],
val timestamp: Long,
val eventMetadata: Option[Any],
val entityType: String,
val slice: Int,
val filtered: Boolean,
val source: String,
val tags: Set[String]
) {
/** Get the event value, throwing exception if not present or filtered */
def event: Event
/** Java API: Get the event value, throwing exception if not present or filtered */
def getEvent(): Event
/** Java API: Get the optional event value */
def getOptionalEvent(): Optional[Event]
/** Java API: Get the event metadata */
def getEventMetaData(): Optional[AnyRef]
/** Java API: Get the tags */
def getTags(): java.util.Set[String]
}
trait EventsBySliceQuery extends ReadJournal {
def eventsBySlices[Event](entityType: String, minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}Query interface for durable state changes and persistence.
sealed trait DurableStateChange[A] {
def persistenceId: String
def offset: Offset
}
final class UpdatedDurableState[A](
val persistenceId: String,
val revision: Long,
val value: A,
override val offset: Offset,
val timestamp: Long
) extends DurableStateChange[A]
trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
}Concrete read journal implementations for specific storage backends.
// LevelDB Implementation (deprecated)
class LeveldbReadJournal extends ReadJournal
with PersistenceIdsQuery
with CurrentPersistenceIdsQuery
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery
with EventsByTagQuery
with CurrentEventsByTagQuery
object LeveldbReadJournal {
val Identifier = "akka.persistence.query.journal.leveldb"
}
// Firehose Implementation
class EventsBySliceFirehoseQuery extends ReadJournal
with EventsBySliceQuery
with EventsBySliceStartingFromSnapshotsQuery
with EventTimestampQuery
with LoadEventQuery
object EventsBySliceFirehoseQuery {
val Identifier = "akka.persistence.query.events-by-slice-firehose"
}eventsByPersistenceId and eventsByTag for continuous event processingcurrentEventsByPersistenceId and currentEventsByTag for batch processing/**
* A query plugin must implement a class that implements this trait.
* It provides the concrete implementations for the Java and Scala APIs.
*
* A read journal plugin must provide implementations for both
* scaladsl.ReadJournal and javadsl.ReadJournal.
*/
trait ReadJournalProvider {
/**
* The ReadJournal implementation for the Scala API.
* This corresponds to the instance that is returned by PersistenceQuery#readJournalFor.
*/
def scaladslReadJournal(): scaladsl.ReadJournal
/**
* The ReadJournal implementation for the Java API.
* This corresponds to the instance that is returned by PersistenceQuery#getReadJournalFor.
*/
def javadslReadJournal(): javadsl.ReadJournal
}
final class DeletedDurableState[A](
val persistenceId: String,
val revision: Long,
override val offset: Offset,
val timestamp: Long
) extends DurableStateChange[A]
object TimestampOffset {
val Zero: TimestampOffset
def apply(timestamp: Instant, seen: Map[String, Long]): TimestampOffset
def toTimestampOffset(offset: Offset): TimestampOffset
}
import java.util.Optional
import java.time.Instant
import scala.collection.immutable