Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.
—
Journal plugin development interfaces for implementing custom persistence backends. These APIs enable developers to create journal plugins that store and retrieve persistent messages with various storage systems.
Base trait for implementing asynchronous, non-blocking journal plugins.
/**
* Abstract journal optimized for asynchronous, non-blocking writes
*/
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
/**
* Plugin API: Asynchronously write messages to the journal.
* The returned future must be completed when all AtomicWrite operations are finished.
* The returned sequence must have the same size as the input sequence and must contain results
* for each AtomicWrite in the same order.
*/
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
/**
* Plugin API: Asynchronously delete messages up to the given sequence number.
* If `permanent` is false, messages are marked as deleted but not physically removed.
*/
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
}Usage Examples:
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.{AtomicWrite, PersistentRepr}
import scala.concurrent.Future
import scala.util.{Try, Success, Failure}
class CustomJournal extends AsyncWriteJournal {
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
// Custom storage implementation
Future.traverse(messages) { atomicWrite =>
val persistenceId = atomicWrite.persistenceId
val batch = atomicWrite.payload
// Store messages in your backend
storeMessages(persistenceId, batch).map(_ => Success(())).recover {
case ex => Failure(ex)
}
}
}
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
// Mark messages as deleted in your backend
markMessagesDeleted(persistenceId, toSequenceNr)
}
override def asyncReplayMessages(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long
)(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
// Replay messages from your backend
replayFromStore(persistenceId, fromSequenceNr, toSequenceNr, max) { message =>
recoveryCallback(message)
}
}
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
// Return highest sequence number from your backend
getHighestSequenceNr(persistenceId)
}
// Custom implementation methods
private def storeMessages(persistenceId: String, messages: immutable.Seq[PersistentRepr]): Future[Unit] = ???
private def markMessagesDeleted(persistenceId: String, toSequenceNr: Long): Future[Unit] = ???
private def replayFromStore(persistenceId: String, from: Long, to: Long, max: Long)(callback: PersistentRepr => Unit): Future[Unit] = ???
private def getHighestSequenceNr(persistenceId: String): Future[Long] = ???
}Interface for asynchronous message replay and sequence number recovery.
/**
* Asynchronous message replay and sequence number recovery interface
*/
trait AsyncRecovery {
/**
* Plugin API: Asynchronously replay persistent messages by calling replayCallback.
* Must complete when all messages matching sequence number bounds have been replayed.
*/
def asyncReplayMessages(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long
)(recoveryCallback: PersistentRepr => Unit): Future[Unit]
/**
* Plugin API: Asynchronously read the highest stored sequence number.
* Used by persistent actors to determine starting point for new events.
*/
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
}Base functionality for journal implementations including event adapter integration.
/**
* Base trait providing common journal functionality
*/
private[akka] trait WriteJournalBase {
this: Actor =>
/** Prepare batch of persistent messages for storage */
protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[AtomicWrite]
}Synchronous journal interface - deprecated in favor of AsyncWriteJournal.
/**
* Synchronous write journal - DEPRECATED
* Use AsyncWriteJournal instead for better performance
*/
@deprecated("Use AsyncWriteJournal instead", "2.3.4")
trait SyncWriteJournal extends Actor with WriteJournalBase with SyncRecovery {
/** Synchronously write messages - blocks calling thread */
def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]]
/** Synchronously delete messages - blocks calling thread */
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit
}Container for a batch of persistent messages that must be written atomically.
/**
* Atomic write operation containing messages for single persistence ID
*/
case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) {
/** Persistence ID from first message */
def persistenceId: String = payload.head.persistenceId
/** Lowest sequence number in batch */
def lowestSequenceNr: Long = payload.head.sequenceNr
/** Highest sequence number in batch */
def highestSequenceNr: Long = payload.last.sequenceNr
/** Number of messages in batch */
def size: Int = payload.size
}Journal protocol message for writing persistent messages.
/**
* Request to write messages to journal
*/
case class WriteMessages(
messages: immutable.Seq[PersistentEnvelope],
persistentActor: ActorRef,
actorInstanceId: Int
)Journal protocol message for replaying messages during recovery.
/**
* Request to replay messages from journal
*/
case class ReplayMessages(
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long,
persistenceId: String,
persistentActor: ActorRef
)/** Exception indicating journal operation failure */
case class JournalFailureException(cause: Throwable) extends RuntimeException(cause)
/** Response indicating write message failure */
case class WriteMessageFailure(cause: Throwable, sequenceNr: Long)
/** Response indicating replay message failure */
case class ReplayMessagesFailure(cause: Throwable)Journal plugins are configured in application.conf:
akka.persistence.journal {
plugin = "custom-journal"
# Custom journal configuration
custom-journal {
class = "com.example.CustomJournal"
# Plugin-specific settings
connection-string = "jdbc:postgresql://localhost/events"
batch-size = 100
# Circuit breaker settings
circuit-breaker {
max-failures = 10
call-timeout = 10s
reset-timeout = 30s
}
# Replay filter settings
replay-filter {
mode = "repair-by-discard-old"
window-size = 100
max-old-writers = 10
debug = false
}
}
}AsyncWriteJournal includes built-in circuit breaker protection:
// Circuit breaker configuration
circuit-breaker {
max-failures = 10 # Number of failures before opening circuit
call-timeout = 10s # Timeout for journal operations
reset-timeout = 30s # Time before attempting to close circuit
}Filters out corrupt or duplicate messages during replay:
// Replay filter modes
replay-filter {
mode = "repair-by-discard-old" # repair-by-discard-old, fail, warn, off
window-size = 100 # Size of duplicate detection window
max-old-writers = 10 # Maximum old writers to track
}class OptimizedJournal extends AsyncWriteJournal {
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
// Batch multiple AtomicWrites for better throughput
val batches = messages.grouped(batchSize).toSeq
Future.traverse(batches) { batch =>
// Write batch to storage with single I/O operation
writeBatchToStorage(batch)
}.map(_.flatten)
}
// Use connection pooling and prepared statements
private def writeBatchToStorage(batch: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
connectionPool.withConnection { connection =>
val preparedStatement = connection.prepareStatement(insertSQL)
// Batch insert for better performance
batch.foreach { atomicWrite =>
atomicWrite.payload.foreach { repr =>
preparedStatement.setString(1, repr.persistenceId)
preparedStatement.setLong(2, repr.sequenceNr)
preparedStatement.setBytes(3, serialize(repr))
preparedStatement.addBatch()
}
}
preparedStatement.executeBatch()
}
}
}import akka.persistence.journal.JournalSpec
class CustomJournalSpec extends JournalSpec(config = ConfigFactory.parseString("""
akka.persistence.journal.plugin = "custom-journal"
custom-journal {
class = "com.example.CustomJournal"
# Test configuration
}
""")) {
"Custom journal" should {
"write and replay messages" in {
// Test cases provided by JournalSpec
}
"handle concurrent writes" in {
// Custom test cases
}
}
}For migrating from SyncWriteJournal to AsyncWriteJournal:
// Old sync implementation
class OldSyncJournal extends SyncWriteJournal {
def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = {
// Blocking operations
messages.map(syncWrite)
}
}
// New async implementation
class NewAsyncJournal extends AsyncWriteJournal {
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
// Non-blocking operations
Future.traverse(messages)(asyncWrite)
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13