or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md
tile.json

journal-implementations.mddocs/

Journal Implementations

Akka Persistence Query provides concrete read journal implementations for specific storage backends. These implementations demonstrate how to build read journals and provide working examples for common use cases.

Capabilities

LevelDB Read Journal (Deprecated)

Reference implementation using LevelDB storage backend. Note that this implementation is deprecated as of Akka 2.6.15.

/**
 * LevelDB read journal implementation.
 * @deprecated Use another journal implementation since 2.6.15
 */
class LeveldbReadJournal extends ReadJournal 
  with PersistenceIdsQuery 
  with CurrentPersistenceIdsQuery 
  with EventsByPersistenceIdQuery 
  with CurrentEventsByPersistenceIdQuery 
  with EventsByTagQuery 
  with CurrentEventsByTagQuery {
  
  def persistenceIds(): Source[String, NotUsed]
  def currentPersistenceIds(): Source[String, NotUsed]
  
  def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Source[EventEnvelope, NotUsed]
  def currentEventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Source[EventEnvelope, NotUsed]
  
  def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
  def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}

LevelDB Constants and Factory

Factory methods and constants for LevelDB read journal.

/**
 * LevelDB read journal companion object with plugin identifier.
 * @deprecated Use another journal implementation since 2.6.15
 */
object LeveldbReadJournal {
  /** Plugin identifier for configuration */
  val Identifier = "akka.persistence.query.journal.leveldb"
}

Usage Examples:

import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal

// Note: LevelDB implementation is deprecated
val readJournal = PersistenceQuery(system)
  .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)

// Query all persistence IDs
readJournal
  .persistenceIds()
  .take(10)
  .runForeach { persistenceId =>
    println(s"Found persistence ID: $persistenceId")
  }

// Query events by persistence ID
readJournal
  .eventsByPersistenceId("user-123", 0L, Long.MaxValue)
  .runForeach { envelope =>
    println(s"Event: ${envelope.event} at sequence ${envelope.sequenceNr}")
  }

// Query events by tag
readJournal
  .eventsByTag("user-events", NoOffset)
  .runForeach { envelope =>
    println(s"Tagged event: ${envelope.event} from ${envelope.persistenceId}")
  }

// Query current (finite) events
readJournal
  .currentEventsByTag("batch-events", NoOffset)
  .runForeach { envelope =>
    processBatchEvent(envelope.event)
  }
  .onComplete {
    case Success(_) => println("Batch processing complete")
    case Failure(ex) => println(s"Batch processing failed: $ex")
  }

LevelDB Read Journal Provider

Provider implementation for LevelDB read journal plugin.

/**
 * LevelDB read journal provider implementation.
 * @deprecated Use another journal implementation since 2.6.15
 */
class LeveldbReadJournalProvider extends ReadJournalProvider {
  def scaladslReadJournal(): scaladsl.ReadJournal
  def javadslReadJournal(): javadsl.ReadJournal
}

Events by Slice Firehose Query

Advanced implementation providing slice-based querying with multiple query capabilities.

/**
 * Firehose implementation for slice-based event queries.
 * Provides high-performance event streaming with slice-based distribution.
 */
class EventsBySliceFirehoseQuery extends ReadJournal 
  with EventsBySliceQuery
  with EventsBySliceStartingFromSnapshotsQuery
  with EventTimestampQuery
  with LoadEventQuery {
  
  // EventsBySliceQuery methods
  def eventsBySlices[Event](entityType: String, minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed]
  def sliceForPersistenceId(persistenceId: String): Int
  def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
  
  // EventsBySliceStartingFromSnapshotsQuery methods
  def eventsBySlicesStartingFromSnapshots[Snapshot, Event](
    entityType: String, 
    minSlice: Int, 
    maxSlice: Int, 
    offset: Offset, 
    transformSnapshot: Snapshot => Event
  ): Source[EventEnvelope[Event], NotUsed]
  
  // EventTimestampQuery methods
  def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]]
  
  // LoadEventQuery methods
  def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]]
}

Firehose Constants and Factory

Constants and factory methods for the firehose implementation.

/**
 * Firehose query companion object with plugin identifier.
 */
object EventsBySliceFirehoseQuery {
  /** Plugin identifier for configuration */
  val Identifier = "akka.persistence.query.events-by-slice-firehose"
}

Usage Examples:

import akka.persistence.query.PersistenceQuery
import akka.persistence.query.typed.scaladsl.EventsBySliceFirehoseQuery

// Get firehose read journal
val firehoseJournal = PersistenceQuery(system)
  .readJournalFor[EventsBySliceFirehoseQuery](EventsBySliceFirehoseQuery.Identifier)

// Query events by slice range
firehoseJournal
  .eventsBySlices[DomainEvent]("User", 0, 127, offset)
  .runForeach { envelope =>
    println(s"Event from slice ${envelope.slice}: ${envelope.event}")
    println(s"Entity type: ${envelope.entityType}")
    println(s"Tags: ${envelope.tags}")
  }

// Distribute processing across slices
val sliceRanges = firehoseJournal.sliceRanges(8)
sliceRanges.zipWithIndex.foreach { case (range, processorId) =>
  println(s"Starting processor $processorId for slices ${range.start}-${range.end}")
  
  firehoseJournal
    .eventsBySlices[DomainEvent]("Order", range.start, range.end, offset)
    .runForeach { envelope =>
      processInProcessor(processorId, envelope)
    }
}

// Load specific events on demand
firehoseJournal
  .loadEnvelope[UserEvent]("user-123", 42L)
  .foreach { envelope =>
    println(s"Loaded event: ${envelope.event}")
  }

// Query event timestamps
firehoseJournal
  .timestampOf("user-123", 42L)
  .foreach {
    case Some(timestamp) => println(s"Event timestamp: $timestamp")
    case None => println("Event not found")
  }

// Query with snapshot integration
firehoseJournal
  .eventsBySlicesStartingFromSnapshots[UserSnapshot, UserEvent](
    entityType = "User",
    minSlice = 0,
    maxSlice = 255,
    offset = TimestampOffset.Zero,
    transformSnapshot = snapshot => UserSnapshotRestored(snapshot.userId, snapshot.state)
  )
  .runForeach { envelope =>
    envelope.event match {
      case UserSnapshotRestored(userId, state) => 
        println(s"Restored snapshot for $userId")
        initializeUserState(userId, state)
      case regularEvent => 
        processUserEvent(regularEvent)
    }
  }

Firehose Read Journal Provider

Provider implementation for the firehose read journal.

/**
 * Provider for EventsBySliceFirehose query implementation.
 */
class EventsBySliceFirehoseReadJournalProvider extends ReadJournalProvider {
  def scaladslReadJournal(): scaladsl.ReadJournal
  def javadslReadJournal(): javadsl.ReadJournal
}

Configuration

LevelDB Configuration

Example configuration for LevelDB read journal (deprecated):

# LevelDB read journal configuration (deprecated)
akka.persistence.query.journal.leveldb {
  # Implementation class
  class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
  
  # Reference to the write journal plugin
  write-plugin = "akka.persistence.journal.leveldb"
  
  # Directory where journal files are stored
  dir = "target/journal"
  
  # Maximum number of events to buffer
  max-buffer-size = 100
  
  # Refresh interval for live queries
  refresh-interval = 1s
}

Firehose Configuration

Example configuration for firehose implementation:

# Firehose read journal configuration
akka.persistence.query.events-by-slice-firehose {
  # Implementation class
  class = "akka.persistence.query.typed.EventsBySliceFirehoseReadJournalProvider"
  
  # Number of slices for horizontal partitioning
  number-of-slices = 1024
  
  # Batch size for event retrieval
  batch-size = 1000
  
  # Refresh interval for live queries
  refresh-interval = 500ms
  
  # Event loading configuration
  event-loading {
    # Timeout for loading individual events
    timeout = 10s
    
    # Parallelism for concurrent event loading
    parallelism = 4
  }
}

Implementation Patterns

Custom Read Journal Implementation

Example of implementing a custom read journal:

class CustomReadJournal(system: ExtendedActorSystem, config: Config) 
  extends ReadJournal 
  with EventsByPersistenceIdQuery
  with EventsByTagQuery
  with PersistenceIdsQuery {
  
  private val backend = new CustomJournalBackend(config)
  
  override def eventsByPersistenceId(
    persistenceId: String,
    fromSequenceNr: Long,
    toSequenceNr: Long
  ): Source[EventEnvelope, NotUsed] = {
    Source
      .unfoldAsync(fromSequenceNr) { seqNr =>
        if (seqNr > toSequenceNr) {
          Future.successful(None)
        } else {
          backend.loadEvent(persistenceId, seqNr).map {
            case Some(event) =>
              val envelope = EventEnvelope(
                offset = Sequence(seqNr),
                persistenceId = persistenceId,
                sequenceNr = seqNr,
                event = event,
                timestamp = System.currentTimeMillis()
              )
              Some((seqNr + 1, envelope))
            case None =>
              None
          }
        }
      }
  }
  
  override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
    val startOffset = offset match {
      case Sequence(value) => value
      case NoOffset => 0L
      case _ => throw new IllegalArgumentException(s"Unsupported offset type: $offset")
    }
    
    Source
      .unfoldAsync(startOffset) { currentOffset =>
        backend.loadEventsByTag(tag, currentOffset, batchSize = 100).map { events =>
          if (events.nonEmpty) {
            val lastOffset = events.last.sequenceNr
            Some((lastOffset + 1, events))
          } else {
            // No more events, but this is a live query so we continue
            Thread.sleep(1000) // Simple polling - real implementation would use more sophisticated approach
            Some((currentOffset, List.empty))
          }
        }
      }
      .mapConcat(identity)
  }
  
  override def persistenceIds(): Source[String, NotUsed] = {
    Source
      .unfoldAsync(Option.empty[String]) { afterId =>
        backend.loadPersistenceIds(afterId, limit = 100).map { ids =>
          if (ids.nonEmpty) {
            Some((ids.lastOption, ids))
          } else {
            // No more IDs, but this is a live query
            Thread.sleep(1000)
            Some((afterId, List.empty))
          }
        }
      }
      .mapConcat(identity)
  }
}

// Custom provider
class CustomReadJournalProvider extends ReadJournalProvider {
  override def scaladslReadJournal(): scaladsl.ReadJournal = {
    new CustomReadJournal(system, config)
  }
  
  override def javadslReadJournal(): javadsl.ReadJournal = {
    new CustomJavaReadJournal(scaladslReadJournal())
  }
}

Plugin Registration

Register custom journal implementation:

# Custom read journal plugin
my-custom-journal {
  class = "com.example.CustomReadJournalProvider"
  
  # Custom configuration
  connection-string = "jdbc:postgresql://localhost/mydb"
  batch-size = 1000
  refresh-interval = 1s
}

Usage:

val customJournal = PersistenceQuery(system)
  .readJournalFor[CustomReadJournal]("my-custom-journal")

Java API Implementation

Java wrapper for custom read journal:

public class CustomJavaReadJournal implements javadsl.ReadJournal, 
                                              javadsl.EventsByPersistenceIdQuery,
                                              javadsl.EventsByTagQuery {
    
    private final CustomReadJournal scaladslJournal;
    
    public CustomJavaReadJournal(CustomReadJournal scaladslJournal) {
        this.scaladslJournal = scaladslJournal;
    }
    
    @Override
    public Source<EventEnvelope, NotUsed> eventsByPersistenceId(
            String persistenceId, 
            long fromSequenceNr, 
            long toSequenceNr) {
        return scaladslJournal
                .eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
                .asJava();
    }
    
    @Override
    public Source<EventEnvelope, NotUsed> eventsByTag(String tag, Offset offset) {
        return scaladslJournal
                .eventsByTag(tag, offset)
                .asJava();
    }
}

Performance Considerations

Batch Processing

Optimize query performance with batching:

// Batch event loading
def loadEventsBatch(persistenceIds: List[String]): Future[List[EventEnvelope]] = {
  Source(persistenceIds)
    .mapAsync(parallelism = 4) { persistenceId =>
      readJournal
        .currentEventsByPersistenceId(persistenceId, 0L, Long.MaxValue)
        .take(100) // Limit per persistence ID
        .runWith(Sink.seq)
    }
    .runWith(Sink.seq)
    .map(_.flatten.toList)
}

// Batch tag queries
def loadTaggedEventsBatch(tags: List[String]): Future[List[EventEnvelope]] = {
  Source(tags)
    .mapAsync(parallelism = 2) { tag =>
      readJournal
        .currentEventsByTag(tag, NoOffset)
        .take(1000) // Limit per tag
        .runWith(Sink.seq)
    }
    .runWith(Sink.seq)
    .map(_.flatten.toList)
}

Slice Distribution

Optimize slice-based processing:

def optimizedSliceProcessing(totalSlices: Int = 1024, processors: Int = 8): Unit = {
  val slicesPerProcessor = totalSlices / processors
  
  (0 until processors).foreach { processorId =>
    val minSlice = processorId * slicesPerProcessor
    val maxSlice = if (processorId == processors - 1) {
      totalSlices - 1 // Last processor handles remaining slices
    } else {
      (processorId + 1) * slicesPerProcessor - 1
    }
    
    println(s"Processor $processorId: slices $minSlice to $maxSlice")
    
    firehoseJournal
      .eventsBySlices[DomainEvent]("Entity", minSlice, maxSlice, offset)
      .buffer(size = 1000, OverflowStrategy.backpressure)
      .runForeach { envelope =>
        processEventOptimized(processorId, envelope)
      }
  }
}

Error Handling

Journal Implementation Errors

Handle errors in custom journal implementations:

class ResilientCustomReadJournal extends CustomReadJournal {
  
  override def eventsByPersistenceId(
    persistenceId: String,
    fromSequenceNr: Long,
    toSequenceNr: Long
  ): Source[EventEnvelope, NotUsed] = {
    
    super.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
      .recover {
        case _: TimeoutException =>
          println(s"Timeout loading events for $persistenceId, retrying...")
          EventEnvelope.empty // Placeholder
        case ex: Exception =>
          println(s"Error loading events for $persistenceId: $ex")
          throw ex
      }
      .filter(_ != EventEnvelope.empty) // Filter out placeholders
  }
}

Plugin Loading Failures

Handle plugin loading and configuration errors:

def safeLoadReadJournal[T <: ReadJournal](pluginId: String): Option[T] = {
  try {
    Some(PersistenceQuery(system).readJournalFor[T](pluginId))
  } catch {
    case _: ClassNotFoundException =>
      println(s"Plugin class not found for $pluginId")
      None
    case _: ConfigurationException =>
      println(s"Configuration error for plugin $pluginId")
      None
    case ex: Exception =>
      println(s"Failed to load plugin $pluginId: $ex")
      None
  }
}

// Usage with fallback
val readJournal = safeLoadReadJournal[LeveldbReadJournal](LeveldbReadJournal.Identifier)
  .getOrElse {
    println("Falling back to in-memory journal")
    getInMemoryReadJournal()
  }

Java API

Java API for journal implementations:

import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
import akka.persistence.query.typed.javadsl.EventsBySliceFirehoseQuery;

// LevelDB Java API (deprecated)
LeveldbReadJournal leveldbJournal = PersistenceQuery.get(system)
    .getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier);

// Firehose Java API
EventsBySliceFirehoseQuery firehoseJournal = PersistenceQuery.get(system)
    .getReadJournalFor(EventsBySliceFirehoseQuery.class, 
                      EventsBySliceFirehoseQuery.Identifier);

// Query events by slice
firehoseJournal
    .eventsBySlices(DomainEvent.class, "User", 0, 127, offset)
    .runForeach(envelope -> {
        System.out.println("Event: " + envelope.getEvent());
        System.out.println("Slice: " + envelope.slice());
    }, system);

// Load specific event
firehoseJournal
    .loadEnvelope(UserEvent.class, "user-123", 42L)
    .thenAccept(envelope -> {
        System.out.println("Loaded: " + envelope.getEvent());
    });

Types

import akka.stream.OverflowStrategy
import java.time.Instant
import scala.concurrent.Future
import akka.stream.scaladsl.{Source, Sink}
import akka.NotUsed
import com.typesafe.config.Config
import akka.actor.ExtendedActorSystem

case class DomainEvent(eventType: String, data: Map[String, Any])
case class UserEvent(userId: String, eventType: String, data: Map[String, Any])
case class UserSnapshot(userId: String, state: Map[String, Any])
case class UserSnapshotRestored(userId: String, state: Map[String, Any])

// Custom backend interface
trait CustomJournalBackend {
  def loadEvent(persistenceId: String, sequenceNr: Long): Future[Option[Any]]
  def loadEventsByTag(tag: String, offset: Long, batchSize: Int): Future[List[EventEnvelope]]
  def loadPersistenceIds(afterId: Option[String], limit: Long): Future[List[String]]
}