CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-persistence-query-2-13

Universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends with support for CQRS and event sourcing patterns.

Overview
Eval results
Files

offsets.mddocs/

Offset Management

Offset management provides position tracking for resumable queries and stream consumption. Different offset types support various backend implementations and consistency requirements.

Capabilities

Offset Base Class

Abstract base class for all offset implementations.

/**
 * Base class for query offsets that track position in event streams
 */
abstract class Offset

Sequence Offset

Ordered sequence number-based offset for strictly ordered backends.

/**
 * Corresponds to an ordered sequence number for the events.
 * The offset is exclusive, i.e. the event with the exact same sequence number will not be included
 * in the returned stream.
 */
final case class Sequence(value: Long) extends Offset with Ordered[Sequence] {
  def compare(that: Sequence): Int
}

Usage Examples:

import akka.persistence.query.Sequence

// Create sequence offset
val offset = Sequence(1000L)

// Use in query
readJournal.eventsByTag("user-events", offset)

// Ordering comparison
val offset1 = Sequence(100L)
val offset2 = Sequence(200L)
println(offset1 < offset2) // true

Time-Based UUID Offset

UUID-based offset for time-ordered event storage systems.

/**
 * Corresponds to an ordered unique identifier of the events.
 * The offset is exclusive, i.e. the event with the exact same sequence number will not be included
 * in the returned stream.
 */
final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID] {
  def compare(other: TimeBasedUUID): Int
}

Usage Examples:

import akka.persistence.query.TimeBasedUUID
import java.util.UUID

// Create time-based UUID offset (must be version 1 UUID)
val timeUuid = UUID.fromString("550e8400-e29b-11d4-a716-446655440000")
val offset = TimeBasedUUID(timeUuid)

// Use in query
readJournal.eventsByTag("order-events", offset)

// Will throw IllegalArgumentException for non-time-based UUIDs
try {
  TimeBasedUUID(UUID.randomUUID()) // Random UUID (version 4)
} catch {
  case _: IllegalArgumentException => println("Must be time-based UUID")
}

Timestamp Offset

Timestamp-based offset with seen sequence number tracking for handling concurrent events.

/**
 * Timestamp based offset. Since there can be several events for the same timestamp it keeps
 * track of what sequence nrs for every persistence id that have been seen at this specific timestamp.
 * 
 * The offset is exclusive, i.e. the event with the exact same sequence number will not be included
 * in the returned stream.
 */
final case class TimestampOffset(
  /** Time when the event was stored, microsecond granularity database timestamp */
  timestamp: Instant, 
  /** Time when the event was read, microsecond granularity database timestamp */
  readTimestamp: Instant, 
  /** List of sequence nrs for every persistence id seen at this timestamp */
  seen: Map[String, Long]
) extends Offset {
  /** Java API */
  def getSeen(): java.util.Map[String, java.lang.Long]
}

Usage Examples:

import akka.persistence.query.TimestampOffset
import java.time.Instant

// Create timestamp offset
val timestamp = Instant.now()
val seen = Map("user-123" -> 5L, "order-456" -> 12L)
val offset = TimestampOffset(timestamp, timestamp, seen)

// Use in query
readJournal.eventsByTag("payment-events", offset)

// Java API usage
val javaSeenMap = offset.getSeen()

No Offset

Marker object for retrieving all events from the beginning.

/**
 * Used when retrieving all events.
 */
case object NoOffset extends Offset {
  /** Java API */
  def getInstance: Offset
}

Usage Examples:

import akka.persistence.query.NoOffset

// Start from beginning
readJournal.eventsByTag("all-events", NoOffset)

// Java API
import akka.persistence.query.NoOffset;
readJournal.eventsByTag("all-events", NoOffset.getInstance());

Offset Factory

Factory methods for creating offset instances.

object Offset {
  /** Get NoOffset instance */
  def noOffset: Offset
  
  /** Create sequence offset from long value */
  def sequence(value: Long): Offset
  
  /** Create time-based UUID offset */
  def timeBasedUUID(uuid: UUID): Offset
  
  /** Create timestamp offset with empty seen map */
  def timestamp(instant: Instant): TimestampOffset
}

Usage Examples:

import akka.persistence.query.Offset
import java.time.Instant
import java.util.UUID

// Factory methods
val noOffset = Offset.noOffset
val seqOffset = Offset.sequence(1000L)
val timeOffset = Offset.timestamp(Instant.now())

// Time-based UUID
val timeUuid = UUID.fromString("550e8400-e29b-11d4-a716-446655440000")
val uuidOffset = Offset.timeBasedUUID(timeUuid)

TimestampOffset Utilities

Utility methods and constants for timestamp offset handling.

object TimestampOffset {
  /** Zero timestamp offset representing epoch */
  val Zero: TimestampOffset
  
  /** Create timestamp offset with given timestamp and seen map */
  def apply(timestamp: Instant, seen: Map[String, Long]): TimestampOffset
  
  /** Try to convert any Offset to TimestampOffset. Epoch timestamp is used for NoOffset. */
  def toTimestampOffset(offset: Offset): TimestampOffset
}

Usage Examples:

import akka.persistence.query.{TimestampOffset, NoOffset, Sequence}
import java.time.Instant

// Zero offset
val zeroOffset = TimestampOffset.Zero

// Convert various offsets
val timestampFromNo = TimestampOffset.toTimestampOffset(NoOffset) // Returns Zero
val timestampFromSeq = try {
  TimestampOffset.toTimestampOffset(Sequence(100L))
} catch {
  case _: IllegalArgumentException => 
    println("Cannot convert Sequence to TimestampOffset")
}

// Create with seen map
val seen = Map("entity-1" -> 10L, "entity-2" -> 5L)
val offset = TimestampOffset(Instant.now(), seen)

Offset Usage Patterns

Query Resumption

Store and restore offsets for resumable event processing:

// Store offset from last processed event
var lastOffset: Offset = NoOffset

readJournal
  .eventsByTag("user-events", lastOffset)
  .runForeach { envelope =>
    // Process event
    processEvent(envelope.event)
    
    // Update stored offset
    lastOffset = envelope.offset
  }

// Later, resume from stored offset
readJournal
  .eventsByTag("user-events", lastOffset)
  .runForeach(processEvent)

Offset Comparison

Compare offsets for ordering (where supported):

val offset1 = Sequence(100L)
val offset2 = Sequence(200L)

if (offset1 < offset2) {
  println("offset1 comes before offset2")
}

// Time-based UUID comparison
val uuid1 = TimeBasedUUID(timeUuid1)
val uuid2 = TimeBasedUUID(timeUuid2)
val ordered = List(uuid2, uuid1).sorted // Uses UUID ordering

Backend-Specific Offset Selection

Choose appropriate offset type based on backend capabilities:

// For sequence-based backends (like LevelDB)
val sequenceOffset = Sequence(lastProcessedSeqNr)

// For timestamp-based backends (like Cassandra)
val timestampOffset = TimestampOffset(lastProcessedTime, seenSequenceNrs)

// For UUID-based backends
val uuidOffset = TimeBasedUUID(lastProcessedUuid)

Error Handling

Offset Validation

// TimeBasedUUID validation
try {
  val offset = TimeBasedUUID(someUuid)
} catch {
  case e: IllegalArgumentException => 
    println(s"Invalid UUID: ${e.getMessage}")
}

// TimestampOffset conversion
try {
  val converted = TimestampOffset.toTimestampOffset(someOffset)
} catch {
  case e: IllegalArgumentException =>
    println(s"Cannot convert offset: ${e.getMessage}")
}

Offset Compatibility

Different backends support different offset types:

  • LevelDB: Primarily Sequence offsets
  • Cassandra: TimestampOffset with microsecond precision
  • Custom backends: May implement any offset type

Types

trait Ordered[A] {
  def compare(that: A): Int
}

case class Range(start: Int, end: Int)

sealed trait UUIDComparator {
  def compare(uuid1: UUID, uuid2: UUID): Int
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-query-2-13

docs

durable-state.md

event-envelopes.md

extension.md

index.md

journal-implementations.md

offsets.md

typed-queries.md

untyped-queries.md

tile.json