CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-persistence-2-13

Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.

Pending
Overview
Eval results
Files

journal-api.mddocs/

Journal API

Journal plugin development interfaces for implementing custom persistence backends. These APIs enable developers to create journal plugins that store and retrieve persistent messages with various storage systems.

Capabilities

AsyncWriteJournal

Base trait for implementing asynchronous, non-blocking journal plugins.

/**
 * Abstract journal optimized for asynchronous, non-blocking writes
 */
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
  
  /**
   * Plugin API: Asynchronously write messages to the journal.
   * The returned future must be completed when all AtomicWrite operations are finished.
   * The returned sequence must have the same size as the input sequence and must contain results 
   * for each AtomicWrite in the same order.
   */
  def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
  
  /**
   * Plugin API: Asynchronously delete messages up to the given sequence number.
   * If `permanent` is false, messages are marked as deleted but not physically removed.
   */
  def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
}

Usage Examples:

import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.{AtomicWrite, PersistentRepr}
import scala.concurrent.Future
import scala.util.{Try, Success, Failure}

class CustomJournal extends AsyncWriteJournal {
  
  override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
    // Custom storage implementation
    Future.traverse(messages) { atomicWrite =>
      val persistenceId = atomicWrite.persistenceId
      val batch = atomicWrite.payload
      
      // Store messages in your backend
      storeMessages(persistenceId, batch).map(_ => Success(())).recover {
        case ex => Failure(ex)
      }
    }
  }
  
  override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
    // Mark messages as deleted in your backend
    markMessagesDeleted(persistenceId, toSequenceNr)
  }
  
  override def asyncReplayMessages(
    persistenceId: String, 
    fromSequenceNr: Long, 
    toSequenceNr: Long, 
    max: Long
  )(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
    // Replay messages from your backend
    replayFromStore(persistenceId, fromSequenceNr, toSequenceNr, max) { message =>
      recoveryCallback(message)
    }
  }
  
  override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
    // Return highest sequence number from your backend
    getHighestSequenceNr(persistenceId)
  }
  
  // Custom implementation methods
  private def storeMessages(persistenceId: String, messages: immutable.Seq[PersistentRepr]): Future[Unit] = ???
  private def markMessagesDeleted(persistenceId: String, toSequenceNr: Long): Future[Unit] = ???
  private def replayFromStore(persistenceId: String, from: Long, to: Long, max: Long)(callback: PersistentRepr => Unit): Future[Unit] = ???
  private def getHighestSequenceNr(persistenceId: String): Future[Long] = ???
}

AsyncRecovery

Interface for asynchronous message replay and sequence number recovery.

/**
 * Asynchronous message replay and sequence number recovery interface
 */
trait AsyncRecovery {
  
  /**
   * Plugin API: Asynchronously replay persistent messages by calling replayCallback.
   * Must complete when all messages matching sequence number bounds have been replayed.
   */
  def asyncReplayMessages(
    persistenceId: String, 
    fromSequenceNr: Long, 
    toSequenceNr: Long, 
    max: Long
  )(recoveryCallback: PersistentRepr => Unit): Future[Unit]
  
  /**
   * Plugin API: Asynchronously read the highest stored sequence number.
   * Used by persistent actors to determine starting point for new events.
   */
  def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
}

WriteJournalBase

Base functionality for journal implementations including event adapter integration.

/**
 * Base trait providing common journal functionality
 */
private[akka] trait WriteJournalBase {
  this: Actor =>
  
  /** Prepare batch of persistent messages for storage */
  protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[AtomicWrite]
}

SyncWriteJournal (Deprecated)

Synchronous journal interface - deprecated in favor of AsyncWriteJournal.

/**
 * Synchronous write journal - DEPRECATED
 * Use AsyncWriteJournal instead for better performance
 */
@deprecated("Use AsyncWriteJournal instead", "2.3.4")
trait SyncWriteJournal extends Actor with WriteJournalBase with SyncRecovery {
  
  /** Synchronously write messages - blocks calling thread */
  def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]]
  
  /** Synchronously delete messages - blocks calling thread */
  def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit
}

Journal Message Types

AtomicWrite

Container for a batch of persistent messages that must be written atomically.

/**
 * Atomic write operation containing messages for single persistence ID
 */
case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) {
  /** Persistence ID from first message */
  def persistenceId: String = payload.head.persistenceId
  
  /** Lowest sequence number in batch */
  def lowestSequenceNr: Long = payload.head.sequenceNr
  
  /** Highest sequence number in batch */
  def highestSequenceNr: Long = payload.last.sequenceNr
  
  /** Number of messages in batch */
  def size: Int = payload.size
}

WriteMessages

Journal protocol message for writing persistent messages.

/**
 * Request to write messages to journal
 */
case class WriteMessages(
  messages: immutable.Seq[PersistentEnvelope],
  persistentActor: ActorRef,
  actorInstanceId: Int
)

ReplayMessages

Journal protocol message for replaying messages during recovery.

/**
 * Request to replay messages from journal
 */
case class ReplayMessages(
  fromSequenceNr: Long,
  toSequenceNr: Long,
  max: Long,
  persistenceId: String,
  persistentActor: ActorRef
)

Error Handling

Journal Failures

/** Exception indicating journal operation failure */
case class JournalFailureException(cause: Throwable) extends RuntimeException(cause)

/** Response indicating write message failure */
case class WriteMessageFailure(cause: Throwable, sequenceNr: Long)

/** Response indicating replay message failure */  
case class ReplayMessagesFailure(cause: Throwable)

Plugin Configuration

Journal plugins are configured in application.conf:

akka.persistence.journal {
  plugin = "custom-journal"
  
  # Custom journal configuration
  custom-journal {
    class = "com.example.CustomJournal"
    
    # Plugin-specific settings
    connection-string = "jdbc:postgresql://localhost/events"
    batch-size = 100
    
    # Circuit breaker settings
    circuit-breaker {
      max-failures = 10
      call-timeout = 10s
      reset-timeout = 30s
    }
    
    # Replay filter settings
    replay-filter {
      mode = "repair-by-discard-old"
      window-size = 100
      max-old-writers = 10
      debug = false
    }
  }
}

Advanced Features

Circuit Breaker Integration

AsyncWriteJournal includes built-in circuit breaker protection:

// Circuit breaker configuration
circuit-breaker {
  max-failures = 10      # Number of failures before opening circuit
  call-timeout = 10s     # Timeout for journal operations  
  reset-timeout = 30s    # Time before attempting to close circuit
}

Replay Filter

Filters out corrupt or duplicate messages during replay:

// Replay filter modes
replay-filter {
  mode = "repair-by-discard-old"  # repair-by-discard-old, fail, warn, off
  window-size = 100               # Size of duplicate detection window
  max-old-writers = 10           # Maximum old writers to track
}

Performance Optimization

class OptimizedJournal extends AsyncWriteJournal {
  
  override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
    // Batch multiple AtomicWrites for better throughput
    val batches = messages.grouped(batchSize).toSeq
    
    Future.traverse(batches) { batch =>
      // Write batch to storage with single I/O operation
      writeBatchToStorage(batch)
    }.map(_.flatten)
  }
  
  // Use connection pooling and prepared statements
  private def writeBatchToStorage(batch: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
    connectionPool.withConnection { connection =>
      val preparedStatement = connection.prepareStatement(insertSQL)
      // Batch insert for better performance
      batch.foreach { atomicWrite =>
        atomicWrite.payload.foreach { repr =>
          preparedStatement.setString(1, repr.persistenceId)
          preparedStatement.setLong(2, repr.sequenceNr)
          preparedStatement.setBytes(3, serialize(repr))
          preparedStatement.addBatch()
        }
      }
      preparedStatement.executeBatch()
    }
  }
}

Testing Journal Plugins

import akka.persistence.journal.JournalSpec

class CustomJournalSpec extends JournalSpec(config = ConfigFactory.parseString("""
  akka.persistence.journal.plugin = "custom-journal"
  custom-journal {
    class = "com.example.CustomJournal"
    # Test configuration
  }
""")) {
  
  "Custom journal" should {
    "write and replay messages" in {
      // Test cases provided by JournalSpec
    }
    
    "handle concurrent writes" in {
      // Custom test cases
    }
  }
}

Migration from Sync to Async

For migrating from SyncWriteJournal to AsyncWriteJournal:

// Old sync implementation
class OldSyncJournal extends SyncWriteJournal {
  def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = {
    // Blocking operations
    messages.map(syncWrite)
  }
}

// New async implementation  
class NewAsyncJournal extends AsyncWriteJournal {
  def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
    // Non-blocking operations
    Future.traverse(messages)(asyncWrite)
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13

docs

at-least-once-delivery.md

durable-state.md

event-adapters.md

index.md

journal-api.md

persistent-actors.md

plugin-development.md

snapshots.md

tile.json