or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md
tile.json

tessl/maven-com-typesafe-akka--akka-persistence-query_2-13

Universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends with support for CQRS and event sourcing patterns.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-persistence-query_2.13@2.8.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-persistence-query_2-13@2.8.0

index.mddocs/

Akka Persistence Query

Akka Persistence Query provides a universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends. It enables building reactive applications, CQRS systems, and event processors that can process persistent event streams with high throughput and reliability.

Package Information

  • Package Name: akka-persistence-query_2.13
  • Package Type: maven
  • Language: Scala
  • Installation: libraryDependencies += "com.typesafe.akka" %% "akka-persistence-query" % "2.8.8"

Core Imports

import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.ReadJournal
import akka.persistence.query.{EventEnvelope, Offset}

Java API:

import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.javadsl.ReadJournal;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.Offset;

Basic Usage

import akka.actor.ActorSystem
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.stream.scaladsl.Sink

implicit val system: ActorSystem = ActorSystem("example")

// Get read journal for querying
val readJournal = PersistenceQuery(system)
  .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)

// Query events by persistence ID
readJournal
  .eventsByPersistenceId("user-123", 0L, Long.MaxValue)
  .runWith(Sink.foreach { envelope =>
    println(s"Event: ${envelope.event} at ${envelope.sequenceNr}")
  })

// Query events by tag
readJournal
  .eventsByTag("user-events", Offset.noOffset)
  .runWith(Sink.foreach { envelope =>
    println(s"Tagged event: ${envelope.event}")
  })

Architecture

Akka Persistence Query is built around several key components:

  • Read Journal Interface: Pluggable backend implementations for different storage systems
  • Query API: Standardized interface for event and state queries across different journal implementations
  • Offset System: Position tracking mechanism for resumable stream consumption
  • Event Envelopes: Metadata wrappers containing events, offsets, timestamps, and persistence information
  • Streaming Integration: Built on Akka Streams for backpressure-aware event processing
  • Dual API: Both untyped (original) and typed APIs with Scala and Java variants

Capabilities

Extension and Configuration

Main extension entry point for obtaining read journal instances from configured plugins.

object PersistenceQuery extends ExtensionId[PersistenceQuery] {
  def get(system: ActorSystem): PersistenceQuery
  def get(system: ClassicActorSystemProvider): PersistenceQuery
}

class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
  def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T
  def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T
  def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T
  def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T
}

Extension and Configuration

Offset Management

Position tracking system for resumable queries and stream consumption.

abstract class Offset

case class Sequence(value: Long) extends Offset with Ordered[Sequence]
case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID]
case class TimestampOffset(timestamp: Instant, readTimestamp: Instant, seen: Map[String, Long]) extends Offset
case object NoOffset extends Offset

object Offset {
  def noOffset: Offset
  def sequence(value: Long): Offset
  def timeBasedUUID(uuid: UUID): Offset
  def timestamp(instant: Instant): TimestampOffset
}

Offset Management

Event Envelopes

Event wrapper classes providing metadata for streamed events.

final class EventEnvelope(
  val offset: Offset,
  val persistenceId: String,
  val sequenceNr: Long,
  val event: Any,
  val timestamp: Long,
  val eventMetadata: Option[Any]
) extends Product4[Offset, String, Long, Any]

Event Envelopes

Untyped Query API

Original query interface for event and persistence ID queries.

trait ReadJournal

trait EventsByPersistenceIdQuery extends ReadJournal {
  def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed]
}

trait EventsByTagQuery extends ReadJournal {
  def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}

trait PersistenceIdsQuery extends ReadJournal {
  def persistenceIds(): Source[String, NotUsed]
}

Untyped Query API

Typed Query API

Enhanced type-safe query interface with improved event envelopes and slice-based querying.

final class EventEnvelope[Event](
  val offset: Offset,
  val persistenceId: String,
  val sequenceNr: Long,
  val eventOption: Option[Event],
  val timestamp: Long,
  val eventMetadata: Option[Any],
  val entityType: String,
  val slice: Int,
  val filtered: Boolean,
  val source: String,
  val tags: Set[String]
) {
  /** Get the event value, throwing exception if not present or filtered */
  def event: Event
  
  /** Java API: Get the event value, throwing exception if not present or filtered */
  def getEvent(): Event
  
  /** Java API: Get the optional event value */
  def getOptionalEvent(): Optional[Event]
  
  /** Java API: Get the event metadata */
  def getEventMetaData(): Optional[AnyRef]
  
  /** Java API: Get the tags */
  def getTags(): java.util.Set[String]
}

trait EventsBySliceQuery extends ReadJournal {
  def eventsBySlices[Event](entityType: String, minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed]
  def sliceForPersistenceId(persistenceId: String): Int
  def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

Typed Query API

Durable State Queries

Query interface for durable state changes and persistence.

sealed trait DurableStateChange[A] {
  def persistenceId: String
  def offset: Offset
}

final class UpdatedDurableState[A](
  val persistenceId: String,
  val revision: Long,
  val value: A,
  override val offset: Offset,
  val timestamp: Long
) extends DurableStateChange[A]

trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
  def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
  def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
}

Durable State Queries

Journal Implementations

Concrete read journal implementations for specific storage backends.

// LevelDB Implementation (deprecated)
class LeveldbReadJournal extends ReadJournal 
  with PersistenceIdsQuery 
  with CurrentPersistenceIdsQuery 
  with EventsByPersistenceIdQuery 
  with CurrentEventsByPersistenceIdQuery 
  with EventsByTagQuery 
  with CurrentEventsByTagQuery

object LeveldbReadJournal {
  val Identifier = "akka.persistence.query.journal.leveldb"
}

// Firehose Implementation
class EventsBySliceFirehoseQuery extends ReadJournal 
  with EventsBySliceQuery
  with EventsBySliceStartingFromSnapshotsQuery
  with EventTimestampQuery
  with LoadEventQuery

object EventsBySliceFirehoseQuery {
  val Identifier = "akka.persistence.query.events-by-slice-firehose"
}

Journal Implementations

Common Query Patterns

  • Live Streaming: Use eventsByPersistenceId and eventsByTag for continuous event processing
  • Finite Queries: Use currentEventsByPersistenceId and currentEventsByTag for batch processing
  • Horizontal Scaling: Use typed slice-based queries for distributed event processing
  • State Reconstruction: Combine snapshot and event queries for efficient state rebuilding
  • CQRS Read Models: Build projections using event streams with offset tracking

Types

/**
 * A query plugin must implement a class that implements this trait.
 * It provides the concrete implementations for the Java and Scala APIs.
 *
 * A read journal plugin must provide implementations for both
 * scaladsl.ReadJournal and javadsl.ReadJournal.
 */
trait ReadJournalProvider {
  /**
   * The ReadJournal implementation for the Scala API.
   * This corresponds to the instance that is returned by PersistenceQuery#readJournalFor.
   */
  def scaladslReadJournal(): scaladsl.ReadJournal
  
  /**
   * The ReadJournal implementation for the Java API.
   * This corresponds to the instance that is returned by PersistenceQuery#getReadJournalFor.
   */
  def javadslReadJournal(): javadsl.ReadJournal
}

final class DeletedDurableState[A](
  val persistenceId: String,
  val revision: Long,
  override val offset: Offset,
  val timestamp: Long
) extends DurableStateChange[A]

object TimestampOffset {
  val Zero: TimestampOffset
  def apply(timestamp: Instant, seen: Map[String, Long]): TimestampOffset
  def toTimestampOffset(offset: Offset): TimestampOffset
}

import java.util.Optional
import java.time.Instant
import scala.collection.immutable