Akka Persistence Query provides concrete read journal implementations for specific storage backends. These implementations demonstrate how to build read journals and provide working examples for common use cases.
Reference implementation using LevelDB storage backend. Note that this implementation is deprecated as of Akka 2.6.15.
/**
* LevelDB read journal implementation.
* @deprecated Use another journal implementation since 2.6.15
*/
class LeveldbReadJournal extends ReadJournal
with PersistenceIdsQuery
with CurrentPersistenceIdsQuery
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery
with EventsByTagQuery
with CurrentEventsByTagQuery {
def persistenceIds(): Source[String, NotUsed]
def currentPersistenceIds(): Source[String, NotUsed]
def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Source[EventEnvelope, NotUsed]
def currentEventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Source[EventEnvelope, NotUsed]
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}Factory methods and constants for LevelDB read journal.
/**
* LevelDB read journal companion object with plugin identifier.
* @deprecated Use another journal implementation since 2.6.15
*/
object LeveldbReadJournal {
/** Plugin identifier for configuration */
val Identifier = "akka.persistence.query.journal.leveldb"
}Usage Examples:
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
// Note: LevelDB implementation is deprecated
val readJournal = PersistenceQuery(system)
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
// Query all persistence IDs
readJournal
.persistenceIds()
.take(10)
.runForeach { persistenceId =>
println(s"Found persistence ID: $persistenceId")
}
// Query events by persistence ID
readJournal
.eventsByPersistenceId("user-123", 0L, Long.MaxValue)
.runForeach { envelope =>
println(s"Event: ${envelope.event} at sequence ${envelope.sequenceNr}")
}
// Query events by tag
readJournal
.eventsByTag("user-events", NoOffset)
.runForeach { envelope =>
println(s"Tagged event: ${envelope.event} from ${envelope.persistenceId}")
}
// Query current (finite) events
readJournal
.currentEventsByTag("batch-events", NoOffset)
.runForeach { envelope =>
processBatchEvent(envelope.event)
}
.onComplete {
case Success(_) => println("Batch processing complete")
case Failure(ex) => println(s"Batch processing failed: $ex")
}Provider implementation for LevelDB read journal plugin.
/**
* LevelDB read journal provider implementation.
* @deprecated Use another journal implementation since 2.6.15
*/
class LeveldbReadJournalProvider extends ReadJournalProvider {
def scaladslReadJournal(): scaladsl.ReadJournal
def javadslReadJournal(): javadsl.ReadJournal
}Advanced implementation providing slice-based querying with multiple query capabilities.
/**
* Firehose implementation for slice-based event queries.
* Provides high-performance event streaming with slice-based distribution.
*/
class EventsBySliceFirehoseQuery extends ReadJournal
with EventsBySliceQuery
with EventsBySliceStartingFromSnapshotsQuery
with EventTimestampQuery
with LoadEventQuery {
// EventsBySliceQuery methods
def eventsBySlices[Event](entityType: String, minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
// EventsBySliceStartingFromSnapshotsQuery methods
def eventsBySlicesStartingFromSnapshots[Snapshot, Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset,
transformSnapshot: Snapshot => Event
): Source[EventEnvelope[Event], NotUsed]
// EventTimestampQuery methods
def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]]
// LoadEventQuery methods
def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]]
}Constants and factory methods for the firehose implementation.
/**
* Firehose query companion object with plugin identifier.
*/
object EventsBySliceFirehoseQuery {
/** Plugin identifier for configuration */
val Identifier = "akka.persistence.query.events-by-slice-firehose"
}Usage Examples:
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.typed.scaladsl.EventsBySliceFirehoseQuery
// Get firehose read journal
val firehoseJournal = PersistenceQuery(system)
.readJournalFor[EventsBySliceFirehoseQuery](EventsBySliceFirehoseQuery.Identifier)
// Query events by slice range
firehoseJournal
.eventsBySlices[DomainEvent]("User", 0, 127, offset)
.runForeach { envelope =>
println(s"Event from slice ${envelope.slice}: ${envelope.event}")
println(s"Entity type: ${envelope.entityType}")
println(s"Tags: ${envelope.tags}")
}
// Distribute processing across slices
val sliceRanges = firehoseJournal.sliceRanges(8)
sliceRanges.zipWithIndex.foreach { case (range, processorId) =>
println(s"Starting processor $processorId for slices ${range.start}-${range.end}")
firehoseJournal
.eventsBySlices[DomainEvent]("Order", range.start, range.end, offset)
.runForeach { envelope =>
processInProcessor(processorId, envelope)
}
}
// Load specific events on demand
firehoseJournal
.loadEnvelope[UserEvent]("user-123", 42L)
.foreach { envelope =>
println(s"Loaded event: ${envelope.event}")
}
// Query event timestamps
firehoseJournal
.timestampOf("user-123", 42L)
.foreach {
case Some(timestamp) => println(s"Event timestamp: $timestamp")
case None => println("Event not found")
}
// Query with snapshot integration
firehoseJournal
.eventsBySlicesStartingFromSnapshots[UserSnapshot, UserEvent](
entityType = "User",
minSlice = 0,
maxSlice = 255,
offset = TimestampOffset.Zero,
transformSnapshot = snapshot => UserSnapshotRestored(snapshot.userId, snapshot.state)
)
.runForeach { envelope =>
envelope.event match {
case UserSnapshotRestored(userId, state) =>
println(s"Restored snapshot for $userId")
initializeUserState(userId, state)
case regularEvent =>
processUserEvent(regularEvent)
}
}Provider implementation for the firehose read journal.
/**
* Provider for EventsBySliceFirehose query implementation.
*/
class EventsBySliceFirehoseReadJournalProvider extends ReadJournalProvider {
def scaladslReadJournal(): scaladsl.ReadJournal
def javadslReadJournal(): javadsl.ReadJournal
}Example configuration for LevelDB read journal (deprecated):
# LevelDB read journal configuration (deprecated)
akka.persistence.query.journal.leveldb {
# Implementation class
class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
# Reference to the write journal plugin
write-plugin = "akka.persistence.journal.leveldb"
# Directory where journal files are stored
dir = "target/journal"
# Maximum number of events to buffer
max-buffer-size = 100
# Refresh interval for live queries
refresh-interval = 1s
}Example configuration for firehose implementation:
# Firehose read journal configuration
akka.persistence.query.events-by-slice-firehose {
# Implementation class
class = "akka.persistence.query.typed.EventsBySliceFirehoseReadJournalProvider"
# Number of slices for horizontal partitioning
number-of-slices = 1024
# Batch size for event retrieval
batch-size = 1000
# Refresh interval for live queries
refresh-interval = 500ms
# Event loading configuration
event-loading {
# Timeout for loading individual events
timeout = 10s
# Parallelism for concurrent event loading
parallelism = 4
}
}Example of implementing a custom read journal:
class CustomReadJournal(system: ExtendedActorSystem, config: Config)
extends ReadJournal
with EventsByPersistenceIdQuery
with EventsByTagQuery
with PersistenceIdsQuery {
private val backend = new CustomJournalBackend(config)
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long
): Source[EventEnvelope, NotUsed] = {
Source
.unfoldAsync(fromSequenceNr) { seqNr =>
if (seqNr > toSequenceNr) {
Future.successful(None)
} else {
backend.loadEvent(persistenceId, seqNr).map {
case Some(event) =>
val envelope = EventEnvelope(
offset = Sequence(seqNr),
persistenceId = persistenceId,
sequenceNr = seqNr,
event = event,
timestamp = System.currentTimeMillis()
)
Some((seqNr + 1, envelope))
case None =>
None
}
}
}
}
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
val startOffset = offset match {
case Sequence(value) => value
case NoOffset => 0L
case _ => throw new IllegalArgumentException(s"Unsupported offset type: $offset")
}
Source
.unfoldAsync(startOffset) { currentOffset =>
backend.loadEventsByTag(tag, currentOffset, batchSize = 100).map { events =>
if (events.nonEmpty) {
val lastOffset = events.last.sequenceNr
Some((lastOffset + 1, events))
} else {
// No more events, but this is a live query so we continue
Thread.sleep(1000) // Simple polling - real implementation would use more sophisticated approach
Some((currentOffset, List.empty))
}
}
}
.mapConcat(identity)
}
override def persistenceIds(): Source[String, NotUsed] = {
Source
.unfoldAsync(Option.empty[String]) { afterId =>
backend.loadPersistenceIds(afterId, limit = 100).map { ids =>
if (ids.nonEmpty) {
Some((ids.lastOption, ids))
} else {
// No more IDs, but this is a live query
Thread.sleep(1000)
Some((afterId, List.empty))
}
}
}
.mapConcat(identity)
}
}
// Custom provider
class CustomReadJournalProvider extends ReadJournalProvider {
override def scaladslReadJournal(): scaladsl.ReadJournal = {
new CustomReadJournal(system, config)
}
override def javadslReadJournal(): javadsl.ReadJournal = {
new CustomJavaReadJournal(scaladslReadJournal())
}
}Register custom journal implementation:
# Custom read journal plugin
my-custom-journal {
class = "com.example.CustomReadJournalProvider"
# Custom configuration
connection-string = "jdbc:postgresql://localhost/mydb"
batch-size = 1000
refresh-interval = 1s
}Usage:
val customJournal = PersistenceQuery(system)
.readJournalFor[CustomReadJournal]("my-custom-journal")Java wrapper for custom read journal:
public class CustomJavaReadJournal implements javadsl.ReadJournal,
javadsl.EventsByPersistenceIdQuery,
javadsl.EventsByTagQuery {
private final CustomReadJournal scaladslJournal;
public CustomJavaReadJournal(CustomReadJournal scaladslJournal) {
this.scaladslJournal = scaladslJournal;
}
@Override
public Source<EventEnvelope, NotUsed> eventsByPersistenceId(
String persistenceId,
long fromSequenceNr,
long toSequenceNr) {
return scaladslJournal
.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
.asJava();
}
@Override
public Source<EventEnvelope, NotUsed> eventsByTag(String tag, Offset offset) {
return scaladslJournal
.eventsByTag(tag, offset)
.asJava();
}
}Optimize query performance with batching:
// Batch event loading
def loadEventsBatch(persistenceIds: List[String]): Future[List[EventEnvelope]] = {
Source(persistenceIds)
.mapAsync(parallelism = 4) { persistenceId =>
readJournal
.currentEventsByPersistenceId(persistenceId, 0L, Long.MaxValue)
.take(100) // Limit per persistence ID
.runWith(Sink.seq)
}
.runWith(Sink.seq)
.map(_.flatten.toList)
}
// Batch tag queries
def loadTaggedEventsBatch(tags: List[String]): Future[List[EventEnvelope]] = {
Source(tags)
.mapAsync(parallelism = 2) { tag =>
readJournal
.currentEventsByTag(tag, NoOffset)
.take(1000) // Limit per tag
.runWith(Sink.seq)
}
.runWith(Sink.seq)
.map(_.flatten.toList)
}Optimize slice-based processing:
def optimizedSliceProcessing(totalSlices: Int = 1024, processors: Int = 8): Unit = {
val slicesPerProcessor = totalSlices / processors
(0 until processors).foreach { processorId =>
val minSlice = processorId * slicesPerProcessor
val maxSlice = if (processorId == processors - 1) {
totalSlices - 1 // Last processor handles remaining slices
} else {
(processorId + 1) * slicesPerProcessor - 1
}
println(s"Processor $processorId: slices $minSlice to $maxSlice")
firehoseJournal
.eventsBySlices[DomainEvent]("Entity", minSlice, maxSlice, offset)
.buffer(size = 1000, OverflowStrategy.backpressure)
.runForeach { envelope =>
processEventOptimized(processorId, envelope)
}
}
}Handle errors in custom journal implementations:
class ResilientCustomReadJournal extends CustomReadJournal {
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long
): Source[EventEnvelope, NotUsed] = {
super.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
.recover {
case _: TimeoutException =>
println(s"Timeout loading events for $persistenceId, retrying...")
EventEnvelope.empty // Placeholder
case ex: Exception =>
println(s"Error loading events for $persistenceId: $ex")
throw ex
}
.filter(_ != EventEnvelope.empty) // Filter out placeholders
}
}Handle plugin loading and configuration errors:
def safeLoadReadJournal[T <: ReadJournal](pluginId: String): Option[T] = {
try {
Some(PersistenceQuery(system).readJournalFor[T](pluginId))
} catch {
case _: ClassNotFoundException =>
println(s"Plugin class not found for $pluginId")
None
case _: ConfigurationException =>
println(s"Configuration error for plugin $pluginId")
None
case ex: Exception =>
println(s"Failed to load plugin $pluginId: $ex")
None
}
}
// Usage with fallback
val readJournal = safeLoadReadJournal[LeveldbReadJournal](LeveldbReadJournal.Identifier)
.getOrElse {
println("Falling back to in-memory journal")
getInMemoryReadJournal()
}Java API for journal implementations:
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
import akka.persistence.query.typed.javadsl.EventsBySliceFirehoseQuery;
// LevelDB Java API (deprecated)
LeveldbReadJournal leveldbJournal = PersistenceQuery.get(system)
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier);
// Firehose Java API
EventsBySliceFirehoseQuery firehoseJournal = PersistenceQuery.get(system)
.getReadJournalFor(EventsBySliceFirehoseQuery.class,
EventsBySliceFirehoseQuery.Identifier);
// Query events by slice
firehoseJournal
.eventsBySlices(DomainEvent.class, "User", 0, 127, offset)
.runForeach(envelope -> {
System.out.println("Event: " + envelope.getEvent());
System.out.println("Slice: " + envelope.slice());
}, system);
// Load specific event
firehoseJournal
.loadEnvelope(UserEvent.class, "user-123", 42L)
.thenAccept(envelope -> {
System.out.println("Loaded: " + envelope.getEvent());
});import akka.stream.OverflowStrategy
import java.time.Instant
import scala.concurrent.Future
import akka.stream.scaladsl.{Source, Sink}
import akka.NotUsed
import com.typesafe.config.Config
import akka.actor.ExtendedActorSystem
case class DomainEvent(eventType: String, data: Map[String, Any])
case class UserEvent(userId: String, eventType: String, data: Map[String, Any])
case class UserSnapshot(userId: String, state: Map[String, Any])
case class UserSnapshotRestored(userId: String, state: Map[String, Any])
// Custom backend interface
trait CustomJournalBackend {
def loadEvent(persistenceId: String, sequenceNr: Long): Future[Option[Any]]
def loadEventsByTag(tag: String, offset: Long, batchSize: Int): Future[List[EventEnvelope]]
def loadPersistenceIds(afterId: Option[String], limit: Long): Future[List[String]]
}