or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md
tile.json

untyped-queries.mddocs/

Untyped Query API

The untyped (original) query API provides standardized interfaces for querying persisted events and persistence IDs. These traits define optional capabilities that read journal implementations may support.

Capabilities

Base Read Journal

Base marker trait for all Scala read journal implementations.

/**
 * API for reading persistent events and information derived from stored persistent events.
 *
 * The purpose of the API is not to enforce compatibility between different
 * journal implementations, because the technical capabilities may be very different.
 * The interface is very open so that different journals may implement specific queries.
 */
trait ReadJournal

Events by Persistence ID

Query events for a specific persistent actor by its persistence ID.

/**
 * A plugin may optionally support this query by implementing this trait.
 */
trait EventsByPersistenceIdQuery extends ReadJournal {
  /**
   * Query events for a specific PersistentActor identified by persistenceId.
   *
   * You can retrieve a subset of all events by specifying fromSequenceNr and toSequenceNr
   * or use 0L and Long.MaxValue respectively to retrieve all events. The query will
   * return all the events inclusive of the fromSequenceNr and toSequenceNr values.
   *
   * The returned event stream should be ordered by sequence number.
   *
   * The stream is not completed when it reaches the end of the currently stored events,
   * but it continues to push new events when new events are persisted.
   */
  def eventsByPersistenceId(
    persistenceId: String,
    fromSequenceNr: Long,
    toSequenceNr: Long
  ): Source[EventEnvelope, NotUsed]
}

Usage Examples:

import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
import akka.stream.scaladsl.Sink

// Implement the query trait
class MyReadJournal extends ReadJournal with EventsByPersistenceIdQuery {
  def eventsByPersistenceId(
    persistenceId: String,
    fromSequenceNr: Long,
    toSequenceNr: Long
  ): Source[EventEnvelope, NotUsed] = {
    // Implementation specific logic
    Source.empty
  }
}

// Use the query
val readJournal: EventsByPersistenceIdQuery = getMyReadJournal()

// Query all events for a persistence ID (live stream)
readJournal
  .eventsByPersistenceId("user-123", 0L, Long.MaxValue)
  .runWith(Sink.foreach { envelope =>
    println(s"Event: ${envelope.event} at sequence ${envelope.sequenceNr}")
  })

// Query specific sequence number range
readJournal
  .eventsByPersistenceId("order-456", 10L, 50L)
  .runForeach { envelope =>
    processEvent(envelope.event)
  }

Current Events by Persistence ID

Query current (finite) events for a specific persistent actor.

/**
 * A plugin may optionally support this query by implementing this trait.
 */
trait CurrentEventsByPersistenceIdQuery extends ReadJournal {
  /**
   * Same type of query as EventsByPersistenceIdQuery#eventsByPersistenceId but the event stream
   * is completed immediately when it reaches the end of the currently stored events.
   */
  def currentEventsByPersistenceId(
    persistenceId: String,
    fromSequenceNr: Long,
    toSequenceNr: Long
  ): Source[EventEnvelope, NotUsed]
}

Usage Examples:

import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery

// Query current events (finite stream)
val readJournal: CurrentEventsByPersistenceIdQuery = getMyReadJournal()

readJournal
  .currentEventsByPersistenceId("user-123", 0L, Long.MaxValue)
  .runForeach { envelope =>
    println(s"Historical event: ${envelope.event}")
  }
  .onComplete {
    case Success(_) => println("Finished processing historical events")
    case Failure(ex) => println(s"Failed: $ex")
  }

Events by Tag

Query events that have a specific tag across all persistence IDs.

/**
 * A plugin may optionally support this query by implementing this trait.
 */
trait EventsByTagQuery extends ReadJournal {
  /**
   * Query events that have a specific tag. A tag can for example correspond to an
   * aggregate root type (in DDD terminology).
   *
   * The consumer can keep track of its current position in the event stream by storing the
   * offset and restart the query from a given offset after a crash/restart.
   *
   * The returned event stream should be ordered by offset if possible, but this can also be
   * difficult to fulfill for a distributed data store. The order must be documented by the
   * read journal plugin.
   *
   * The stream is not completed when it reaches the end of the currently stored events,
   * but it continues to push new events when new events are persisted.
   */
  def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}

Usage Examples:

import akka.persistence.query.scaladsl.EventsByTagQuery
import akka.persistence.query.{NoOffset, Sequence}

val readJournal: EventsByTagQuery = getMyReadJournal()

// Query events by tag from beginning (live stream)
readJournal
  .eventsByTag("user-events", NoOffset)
  .runForeach { envelope =>
    println(s"Tagged event: ${envelope.event} from ${envelope.persistenceId}")
  }

// Resume from specific offset
val lastProcessedOffset = Sequence(1000L)
readJournal
  .eventsByTag("order-events", lastProcessedOffset)
  .runForeach { envelope =>
    processTaggedEvent(envelope)
    saveOffset(envelope.offset) // Save for resumption
  }

Current Events by Tag

Query current (finite) events that have a specific tag.

/**
 * A plugin may optionally support this query by implementing this trait.
 */
trait CurrentEventsByTagQuery extends ReadJournal {
  /**
   * Same type of query as EventsByTagQuery#eventsByTag but the event stream
   * is completed immediately when it reaches the end of the currently stored events.
   */
  def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}

Usage Examples:

import akka.persistence.query.scaladsl.CurrentEventsByTagQuery

val readJournal: CurrentEventsByTagQuery = getMyReadJournal()

// Process all current events with tag (finite stream)
readJournal
  .currentEventsByTag("batch-process", NoOffset)
  .runForeach { envelope =>
    processBatchEvent(envelope.event)
  }
  .onComplete {
    case Success(_) => println("Batch processing complete")
    case Failure(ex) => println(s"Batch processing failed: $ex")
  }

Persistence IDs Query

Query all persistence IDs in the journal.

/**
 * A plugin may optionally support this query by implementing this trait.
 */
trait PersistenceIdsQuery extends ReadJournal {
  /**
   * Query all PersistentActor identifiers, i.e. as defined by the
   * persistenceId of the PersistentActor.
   *
   * The stream is not completed when it reaches the end of the currently used persistenceIds,
   * but it continues to push new persistenceIds when new persistent actors are created.
   */
  def persistenceIds(): Source[String, NotUsed]
}

Usage Examples:

import akka.persistence.query.scaladsl.PersistenceIdsQuery

val readJournal: PersistenceIdsQuery = getMyReadJournal()

// Get all persistence IDs (live stream)
readJournal
  .persistenceIds()
  .runForeach { persistenceId =>
    println(s"Found persistence ID: $persistenceId")
    
    // Query events for each persistence ID
    readJournal
      .asInstanceOf[EventsByPersistenceIdQuery]
      .eventsByPersistenceId(persistenceId, 0L, Long.MaxValue)
      .take(10) // Limit to first 10 events
      .runForeach { envelope =>
        println(s"  Event: ${envelope.event}")
      }
  }

Current Persistence IDs Query

Query all current (finite) persistence IDs in the journal.

/**
 * A plugin may optionally support this query by implementing this trait.
 */
trait CurrentPersistenceIdsQuery extends ReadJournal {
  /**
   * Same type of query as PersistenceIdsQuery#persistenceIds but the stream
   * is completed immediately when it reaches the end of the currently used persistenceIds.
   */
  def currentPersistenceIds(): Source[String, NotUsed]
}

Usage Examples:

import akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery

val readJournal: CurrentPersistenceIdsQuery = getMyReadJournal()

// Get all current persistence IDs (finite stream)
readJournal
  .currentPersistenceIds()
  .runForeach { persistenceId =>
    println(s"Current persistence ID: $persistenceId")
  }
  .onComplete {
    case Success(_) => println("Finished scanning persistence IDs")
    case Failure(ex) => println(s"Scan failed: $ex")
  }

Paged Persistence IDs Query

Query persistence IDs with pagination support for large datasets.

/**
 * A plugin ReadJournal may optionally support this query by implementing this trait.
 */
trait PagedPersistenceIdsQuery extends ReadJournal {
  /**
   * Get the current persistence ids.
   *
   * Not all plugins may support in database paging, and may simply use drop/take Akka streams operators
   * to manipulate the result set according to the paging parameters.
   *
   * @param afterId The ID to start returning results from, or None to return all ids. This should be an id
   *                returned from a previous invocation of this command. Callers should not assume that ids are
   *                returned in sorted order.
   * @param limit The maximum results to return. Use Long.MaxValue to return all results. Must be greater than zero.
   * @return A source containing all the persistence ids, limited as specified.
   */
  def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]
}

Usage Examples:

import akka.persistence.query.scaladsl.PagedPersistenceIdsQuery

val readJournal: PagedPersistenceIdsQuery = getMyReadJournal()

// Get first page of persistence IDs
readJournal
  .currentPersistenceIds(None, 100)
  .runFold(List.empty[String])(_ :+ _)
  .map { firstPage =>
    println(s"First page: ${firstPage.mkString(", ")}")
    
    // Get next page starting from last ID
    if (firstPage.nonEmpty) {
      val lastId = firstPage.last
      readJournal
        .currentPersistenceIds(Some(lastId), 100)
        .runForeach { nextId =>
          println(s"Next page ID: $nextId")
        }
    }
  }

// Get all persistence IDs with unlimited results
readJournal
  .currentPersistenceIds(None, Long.MaxValue)
  .runForeach { persistenceId =>
    println(s"Persistence ID: $persistenceId")
  }

Types

import akka.NotUsed
import akka.stream.scaladsl.Source

trait Product4[T1, T2, T3, T4] {
  def _1: T1
  def _2: T2  
  def _3: T3
  def _4: T4
}

case object NotUsed

Query Composition

Multiple Query Traits

Combine multiple query capabilities in a single read journal:

class ComprehensiveReadJournal extends ReadJournal 
  with EventsByPersistenceIdQuery
  with CurrentEventsByPersistenceIdQuery
  with EventsByTagQuery
  with CurrentEventsByTagQuery
  with PersistenceIdsQuery
  with CurrentPersistenceIdsQuery {
  
  // Implement all query methods
  def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long) = ???
  def currentEventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long) = ???
  def eventsByTag(tag: String, offset: Offset) = ???
  def currentEventsByTag(tag: String, offset: Offset) = ???
  def persistenceIds() = ???
  def currentPersistenceIds() = ???
}

Capability Detection

Check if a read journal supports specific query capabilities:

def checkCapabilities(readJournal: ReadJournal): Unit = {
  readJournal match {
    case _: EventsByPersistenceIdQuery =>
      println("Supports events by persistence ID queries")
    case _ =>
      println("Does not support events by persistence ID queries")
  }
  
  readJournal match {
    case _: EventsByTagQuery =>
      println("Supports events by tag queries")
    case _ =>
      println("Does not support events by tag queries")
  }
  
  readJournal match {
    case _: PersistenceIdsQuery =>
      println("Supports persistence IDs queries")
    case _ =>
      println("Does not support persistence IDs queries")
  }
}

Error Handling

Query Failures

Handle failures in query streams:

readJournal
  .eventsByTag("user-events", offset)
  .recover {
    case _: TimeoutException => 
      println("Query timed out, restarting...")
      EventEnvelope.empty // Placeholder
  }
  .runForeach { envelope =>
    try {
      processEvent(envelope.event)
    } catch {
      case ex: Exception =>
        println(s"Failed to process event: $ex")
    }
  }

Offset Validation

Validate offsets before using in queries:

def validateOffset(offset: Offset): Boolean = {
  offset match {
    case NoOffset => true
    case Sequence(value) => value >= 0
    case TimestampOffset(timestamp, _, _) => !timestamp.isBefore(Instant.EPOCH)
    case _ => false
  }
}

def safeEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
  if (validateOffset(offset)) {
    readJournal.eventsByTag(tag, offset)
  } else {
    Source.failed(new IllegalArgumentException(s"Invalid offset: $offset"))
  }
}

Java API

All Scala query traits have corresponding Java API equivalents in the javadsl package:

// Java API usage
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.javadsl.*;

// Get Java read journal
ReadJournal readJournal = PersistenceQuery.get(system)
    .getReadJournalFor(MyJavaReadJournal.class, "my-journal");

// Check capabilities and use
if (readJournal instanceof EventsByPersistenceIdQuery) {
    EventsByPersistenceIdQuery query = (EventsByPersistenceIdQuery) readJournal;
    query.eventsByPersistenceId("user-123", 0L, Long.MAX_VALUE)
         .runForeach(envelope -> {
             System.out.println("Event: " + envelope.event());
         }, system);
}

Types

import akka.stream.scaladsl.Source
import akka.NotUsed
import scala.concurrent.Future
import scala.util.{Success, Failure}

trait TimeoutException extends Exception