Universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends with support for CQRS and event sourcing patterns.
Offset management provides position tracking for resumable queries and stream consumption. Different offset types support various backend implementations and consistency requirements.
Abstract base class for all offset implementations.
/**
* Base class for query offsets that track position in event streams
*/
abstract class OffsetOrdered sequence number-based offset for strictly ordered backends.
/**
* Corresponds to an ordered sequence number for the events.
* The offset is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream.
*/
final case class Sequence(value: Long) extends Offset with Ordered[Sequence] {
def compare(that: Sequence): Int
}Usage Examples:
import akka.persistence.query.Sequence
// Create sequence offset
val offset = Sequence(1000L)
// Use in query
readJournal.eventsByTag("user-events", offset)
// Ordering comparison
val offset1 = Sequence(100L)
val offset2 = Sequence(200L)
println(offset1 < offset2) // trueUUID-based offset for time-ordered event storage systems.
/**
* Corresponds to an ordered unique identifier of the events.
* The offset is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream.
*/
final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID] {
def compare(other: TimeBasedUUID): Int
}Usage Examples:
import akka.persistence.query.TimeBasedUUID
import java.util.UUID
// Create time-based UUID offset (must be version 1 UUID)
val timeUuid = UUID.fromString("550e8400-e29b-11d4-a716-446655440000")
val offset = TimeBasedUUID(timeUuid)
// Use in query
readJournal.eventsByTag("order-events", offset)
// Will throw IllegalArgumentException for non-time-based UUIDs
try {
TimeBasedUUID(UUID.randomUUID()) // Random UUID (version 4)
} catch {
case _: IllegalArgumentException => println("Must be time-based UUID")
}Timestamp-based offset with seen sequence number tracking for handling concurrent events.
/**
* Timestamp based offset. Since there can be several events for the same timestamp it keeps
* track of what sequence nrs for every persistence id that have been seen at this specific timestamp.
*
* The offset is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream.
*/
final case class TimestampOffset(
/** Time when the event was stored, microsecond granularity database timestamp */
timestamp: Instant,
/** Time when the event was read, microsecond granularity database timestamp */
readTimestamp: Instant,
/** List of sequence nrs for every persistence id seen at this timestamp */
seen: Map[String, Long]
) extends Offset {
/** Java API */
def getSeen(): java.util.Map[String, java.lang.Long]
}Usage Examples:
import akka.persistence.query.TimestampOffset
import java.time.Instant
// Create timestamp offset
val timestamp = Instant.now()
val seen = Map("user-123" -> 5L, "order-456" -> 12L)
val offset = TimestampOffset(timestamp, timestamp, seen)
// Use in query
readJournal.eventsByTag("payment-events", offset)
// Java API usage
val javaSeenMap = offset.getSeen()Marker object for retrieving all events from the beginning.
/**
* Used when retrieving all events.
*/
case object NoOffset extends Offset {
/** Java API */
def getInstance: Offset
}Usage Examples:
import akka.persistence.query.NoOffset
// Start from beginning
readJournal.eventsByTag("all-events", NoOffset)
// Java API
import akka.persistence.query.NoOffset;
readJournal.eventsByTag("all-events", NoOffset.getInstance());Factory methods for creating offset instances.
object Offset {
/** Get NoOffset instance */
def noOffset: Offset
/** Create sequence offset from long value */
def sequence(value: Long): Offset
/** Create time-based UUID offset */
def timeBasedUUID(uuid: UUID): Offset
/** Create timestamp offset with empty seen map */
def timestamp(instant: Instant): TimestampOffset
}Usage Examples:
import akka.persistence.query.Offset
import java.time.Instant
import java.util.UUID
// Factory methods
val noOffset = Offset.noOffset
val seqOffset = Offset.sequence(1000L)
val timeOffset = Offset.timestamp(Instant.now())
// Time-based UUID
val timeUuid = UUID.fromString("550e8400-e29b-11d4-a716-446655440000")
val uuidOffset = Offset.timeBasedUUID(timeUuid)Utility methods and constants for timestamp offset handling.
object TimestampOffset {
/** Zero timestamp offset representing epoch */
val Zero: TimestampOffset
/** Create timestamp offset with given timestamp and seen map */
def apply(timestamp: Instant, seen: Map[String, Long]): TimestampOffset
/** Try to convert any Offset to TimestampOffset. Epoch timestamp is used for NoOffset. */
def toTimestampOffset(offset: Offset): TimestampOffset
}Usage Examples:
import akka.persistence.query.{TimestampOffset, NoOffset, Sequence}
import java.time.Instant
// Zero offset
val zeroOffset = TimestampOffset.Zero
// Convert various offsets
val timestampFromNo = TimestampOffset.toTimestampOffset(NoOffset) // Returns Zero
val timestampFromSeq = try {
TimestampOffset.toTimestampOffset(Sequence(100L))
} catch {
case _: IllegalArgumentException =>
println("Cannot convert Sequence to TimestampOffset")
}
// Create with seen map
val seen = Map("entity-1" -> 10L, "entity-2" -> 5L)
val offset = TimestampOffset(Instant.now(), seen)Store and restore offsets for resumable event processing:
// Store offset from last processed event
var lastOffset: Offset = NoOffset
readJournal
.eventsByTag("user-events", lastOffset)
.runForeach { envelope =>
// Process event
processEvent(envelope.event)
// Update stored offset
lastOffset = envelope.offset
}
// Later, resume from stored offset
readJournal
.eventsByTag("user-events", lastOffset)
.runForeach(processEvent)Compare offsets for ordering (where supported):
val offset1 = Sequence(100L)
val offset2 = Sequence(200L)
if (offset1 < offset2) {
println("offset1 comes before offset2")
}
// Time-based UUID comparison
val uuid1 = TimeBasedUUID(timeUuid1)
val uuid2 = TimeBasedUUID(timeUuid2)
val ordered = List(uuid2, uuid1).sorted // Uses UUID orderingChoose appropriate offset type based on backend capabilities:
// For sequence-based backends (like LevelDB)
val sequenceOffset = Sequence(lastProcessedSeqNr)
// For timestamp-based backends (like Cassandra)
val timestampOffset = TimestampOffset(lastProcessedTime, seenSequenceNrs)
// For UUID-based backends
val uuidOffset = TimeBasedUUID(lastProcessedUuid)// TimeBasedUUID validation
try {
val offset = TimeBasedUUID(someUuid)
} catch {
case e: IllegalArgumentException =>
println(s"Invalid UUID: ${e.getMessage}")
}
// TimestampOffset conversion
try {
val converted = TimestampOffset.toTimestampOffset(someOffset)
} catch {
case e: IllegalArgumentException =>
println(s"Cannot convert offset: ${e.getMessage}")
}Different backends support different offset types:
Sequence offsetsTimestampOffset with microsecond precisiontrait Ordered[A] {
def compare(that: A): Int
}
case class Range(start: Int, end: Int)
sealed trait UUIDComparator {
def compare(uuid1: UUID, uuid2: UUID): Int
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-query-2-13