or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md
tile.json

extension.mddocs/

Extension and Configuration

The PersistenceQuery extension provides the main entry point for obtaining read journal instances from configured plugins. It manages plugin lifecycle and provides both Scala and Java APIs.

Capabilities

PersistenceQuery Extension

Main actor system extension for persistence query functionality.

/**
 * Persistence extension for queries.
 */
object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdProvider {
  /** Get the extension instance for the given actor system */
  def get(system: ActorSystem): PersistenceQuery
  /** Get the extension instance for the given classic actor system provider */
  def get(system: ClassicActorSystemProvider): PersistenceQuery
  /** Lookup method for extension resolution */
  def lookup: PersistenceQuery.type
}

class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
  /**
   * Scala API: Returns the ReadJournal specified by the given read journal configuration entry
   * @param readJournalPluginId Plugin identifier from configuration
   * @return Configured read journal instance
   */
  def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T
  
  /**
   * Scala API: Returns the ReadJournal with custom configuration
   * @param readJournalPluginId Plugin identifier
   * @param readJournalPluginConfig Custom plugin configuration
   * @return Configured read journal instance
   */
  def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T
  
  /**
   * Java API: Returns the ReadJournal specified by the given read journal configuration entry
   * @param clazz Class of the read journal implementation
   * @param readJournalPluginId Plugin identifier from configuration
   * @return Configured read journal instance
   */
  def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T
  
  /**
   * Java API: Returns the ReadJournal with custom configuration
   * @param clazz Class of the read journal implementation
   * @param readJournalPluginId Plugin identifier
   * @param readJournalPluginConfig Custom plugin configuration
   * @return Configured read journal instance
   */
  def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T
}

Usage Examples:

import akka.actor.ActorSystem
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal

implicit val system: ActorSystem = ActorSystem("example")

// Get read journal using plugin identifier
val readJournal = PersistenceQuery(system)
  .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)

// Get read journal with custom configuration
import com.typesafe.config.ConfigFactory
val customConfig = ConfigFactory.parseString("""
  akka.persistence.query.journal.leveldb {
    class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
    write-plugin = "akka.persistence.journal.leveldb"
    dir = "target/custom-journal"
  }
""")

val customReadJournal = PersistenceQuery(system)
  .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier, customConfig)

Java API usage:

import akka.actor.ActorSystem;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;

ActorSystem system = ActorSystem.create("example");

// Get read journal using plugin identifier
LeveldbReadJournal readJournal = PersistenceQuery.get(system)
    .getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier);

ReadJournalProvider Interface

Plugin provider interface that read journal implementations must implement.

/**
 * A query plugin must implement a class that implements this trait.
 * It provides the concrete implementations for the Java and Scala APIs.
 */
trait ReadJournalProvider {
  /**
   * The ReadJournal implementation for the Scala API.
   * This corresponds to the instance that is returned by PersistenceQuery#readJournalFor.
   */
  def scaladslReadJournal(): scaladsl.ReadJournal
  
  /**
   * The ReadJournal implementation for the Java API.
   * This corresponds to the instance that is returned by PersistenceQuery#getReadJournalFor.
   */
  def javadslReadJournal(): javadsl.ReadJournal
}

Implementation Example:

class MyReadJournalProvider extends ReadJournalProvider {
  override def scaladslReadJournal(): scaladsl.ReadJournal = {
    new MyScalaReadJournal()
  }
  
  override def javadslReadJournal(): javadsl.ReadJournal = {
    new MyJavaReadJournal()
  }
}

Configuration

Plugin Configuration

Read journal plugins are configured in application.conf:

# Example LevelDB read journal configuration
akka.persistence.query.journal.leveldb {
  # Implementation class for the plugin
  class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
  
  # Reference to the write journal plugin
  write-plugin = "akka.persistence.journal.leveldb"
  
  # Directory where journal files are stored
  dir = "target/journal"
  
  # Maximum number of events to replay
  max-buffer-size = 100
}

Plugin Identifier Usage

Each read journal plugin has a unique identifier:

// Built-in plugin identifiers
val leveldbId = "akka.persistence.query.journal.leveldb"
val firehoseId = "akka.persistence.query.events-by-slice-firehose"

// Using with PersistenceQuery
val journal = PersistenceQuery(system).readJournalFor[MyReadJournal](leveldbId)

Custom Plugin Registration

Register custom plugins in configuration:

my-read-journal {
  class = "com.example.MyReadJournalProvider"
  # Additional plugin-specific configuration
  connection-timeout = 5s
  batch-size = 1000
}

Then use with PersistenceQuery:

val customJournal = PersistenceQuery(system)
  .readJournalFor[MyReadJournal]("my-read-journal")

Error Handling

Common Configuration Errors

  • ClassNotFoundException: Plugin class not found in classpath
  • ConfigurationException: Missing or invalid plugin configuration
  • IllegalArgumentException: Invalid plugin identifier

Plugin Resolution

The extension uses the following resolution strategy:

  1. Look up plugin configuration by identifier
  2. Load and instantiate the provider class
  3. Cache the instance for future requests
  4. Return the appropriate Scala or Java DSL implementation

Types

trait ExtensionId[T <: Extension] {
  def createExtension(system: ExtendedActorSystem): T
}

trait ExtensionIdProvider {
  def lookup: ExtensionId[_ <: Extension]
}

trait Extension