The PersistenceQuery extension provides the main entry point for obtaining read journal instances from configured plugins. It manages plugin lifecycle and provides both Scala and Java APIs.
Main actor system extension for persistence query functionality.
/**
* Persistence extension for queries.
*/
object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdProvider {
/** Get the extension instance for the given actor system */
def get(system: ActorSystem): PersistenceQuery
/** Get the extension instance for the given classic actor system provider */
def get(system: ClassicActorSystemProvider): PersistenceQuery
/** Lookup method for extension resolution */
def lookup: PersistenceQuery.type
}
class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
/**
* Scala API: Returns the ReadJournal specified by the given read journal configuration entry
* @param readJournalPluginId Plugin identifier from configuration
* @return Configured read journal instance
*/
def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T
/**
* Scala API: Returns the ReadJournal with custom configuration
* @param readJournalPluginId Plugin identifier
* @param readJournalPluginConfig Custom plugin configuration
* @return Configured read journal instance
*/
def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T
/**
* Java API: Returns the ReadJournal specified by the given read journal configuration entry
* @param clazz Class of the read journal implementation
* @param readJournalPluginId Plugin identifier from configuration
* @return Configured read journal instance
*/
def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T
/**
* Java API: Returns the ReadJournal with custom configuration
* @param clazz Class of the read journal implementation
* @param readJournalPluginId Plugin identifier
* @param readJournalPluginConfig Custom plugin configuration
* @return Configured read journal instance
*/
def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T
}Usage Examples:
import akka.actor.ActorSystem
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
implicit val system: ActorSystem = ActorSystem("example")
// Get read journal using plugin identifier
val readJournal = PersistenceQuery(system)
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
// Get read journal with custom configuration
import com.typesafe.config.ConfigFactory
val customConfig = ConfigFactory.parseString("""
akka.persistence.query.journal.leveldb {
class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
write-plugin = "akka.persistence.journal.leveldb"
dir = "target/custom-journal"
}
""")
val customReadJournal = PersistenceQuery(system)
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier, customConfig)Java API usage:
import akka.actor.ActorSystem;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
ActorSystem system = ActorSystem.create("example");
// Get read journal using plugin identifier
LeveldbReadJournal readJournal = PersistenceQuery.get(system)
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier);Plugin provider interface that read journal implementations must implement.
/**
* A query plugin must implement a class that implements this trait.
* It provides the concrete implementations for the Java and Scala APIs.
*/
trait ReadJournalProvider {
/**
* The ReadJournal implementation for the Scala API.
* This corresponds to the instance that is returned by PersistenceQuery#readJournalFor.
*/
def scaladslReadJournal(): scaladsl.ReadJournal
/**
* The ReadJournal implementation for the Java API.
* This corresponds to the instance that is returned by PersistenceQuery#getReadJournalFor.
*/
def javadslReadJournal(): javadsl.ReadJournal
}Implementation Example:
class MyReadJournalProvider extends ReadJournalProvider {
override def scaladslReadJournal(): scaladsl.ReadJournal = {
new MyScalaReadJournal()
}
override def javadslReadJournal(): javadsl.ReadJournal = {
new MyJavaReadJournal()
}
}Read journal plugins are configured in application.conf:
# Example LevelDB read journal configuration
akka.persistence.query.journal.leveldb {
# Implementation class for the plugin
class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
# Reference to the write journal plugin
write-plugin = "akka.persistence.journal.leveldb"
# Directory where journal files are stored
dir = "target/journal"
# Maximum number of events to replay
max-buffer-size = 100
}Each read journal plugin has a unique identifier:
// Built-in plugin identifiers
val leveldbId = "akka.persistence.query.journal.leveldb"
val firehoseId = "akka.persistence.query.events-by-slice-firehose"
// Using with PersistenceQuery
val journal = PersistenceQuery(system).readJournalFor[MyReadJournal](leveldbId)Register custom plugins in configuration:
my-read-journal {
class = "com.example.MyReadJournalProvider"
# Additional plugin-specific configuration
connection-timeout = 5s
batch-size = 1000
}Then use with PersistenceQuery:
val customJournal = PersistenceQuery(system)
.readJournalFor[MyReadJournal]("my-read-journal")The extension uses the following resolution strategy:
trait ExtensionId[T <: Extension] {
def createExtension(system: ExtendedActorSystem): T
}
trait ExtensionIdProvider {
def lookup: ExtensionId[_ <: Extension]
}
trait Extension