CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-persistence-2-13

Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.

Pending
Overview
Eval results
Files

plugin-development.mddocs/

Plugin Development

Comprehensive guide for developing custom journal and snapshot store plugins for Akka Persistence. This documentation covers the plugin API, implementation patterns, and configuration requirements.

Capabilities

Plugin Provider Interfaces

PersistencePlugin

Base interface for all persistence plugins.

/**
 * Base interface for persistence plugins
 */
trait PersistencePlugin {
  /** Unique plugin identifier */
  def pluginId: String
}

PersistencePluginProxy

Proxy for dynamically selecting persistence plugins.

/**
 * Proxy that delegates to target plugin based on configuration
 */
class PersistencePluginProxy(targetPluginConfig: Config) extends Actor {
  /** Target plugin actor reference */
  def targetPlugin: ActorRef
  
  /** Start target plugin with given configuration */
  def startTargetPlugin(): Unit
}

Journal Plugin Development

AsyncWriteJournal Implementation

Complete implementation template for async journal plugins.

/**
 * Base trait for implementing custom journal plugins
 */
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
  
  /** Plugin API: Write messages asynchronously */
  def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
  
  /** Plugin API: Delete messages asynchronously */  
  def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
  
  /** Plugin API: Replay messages for recovery */
  def asyncReplayMessages(
    persistenceId: String, 
    fromSequenceNr: Long, 
    toSequenceNr: Long, 
    max: Long
  )(recoveryCallback: PersistentRepr => Unit): Future[Unit]
  
  /** Plugin API: Read highest sequence number */
  def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
}

Implementation Example:

import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.{AtomicWrite, PersistentRepr}
import java.sql.{Connection, PreparedStatement}
import javax.sql.DataSource

class DatabaseJournal extends AsyncWriteJournal {
  
  // Configuration and connection management
  private val dataSource: DataSource = createDataSource()
  private val batchSize: Int = config.getInt("batch-size")
  
  override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
    Future {
      withConnection { connection =>
        val statement = connection.prepareStatement(
          "INSERT INTO journal (persistence_id, sequence_nr, payload, manifest) VALUES (?, ?, ?, ?)"
        )
        
        messages.map { atomicWrite =>
          Try {
            atomicWrite.payload.foreach { repr =>
              statement.setString(1, repr.persistenceId)
              statement.setLong(2, repr.sequenceNr)
              statement.setBytes(3, serialize(repr.payload))
              statement.setString(4, repr.manifest)
              statement.addBatch()
            }
            statement.executeBatch()
            statement.clearBatch()
          }
        }
      }
    }
  }
  
  override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
    Future {
      withConnection { connection =>
        val statement = connection.prepareStatement(
          "UPDATE journal SET deleted = true WHERE persistence_id = ? AND sequence_nr <= ?"
        )
        statement.setString(1, persistenceId)
        statement.setLong(2, toSequenceNr)
        statement.executeUpdate()
      }
    }
  }
  
  override def asyncReplayMessages(
    persistenceId: String, 
    fromSequenceNr: Long, 
    toSequenceNr: Long, 
    max: Long
  )(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
    Future {
      withConnection { connection =>
        val statement = connection.prepareStatement(
          """SELECT sequence_nr, payload, manifest FROM journal 
             WHERE persistence_id = ? AND sequence_nr >= ? AND sequence_nr <= ? 
             ORDER BY sequence_nr LIMIT ?"""
        )
        statement.setString(1, persistenceId)
        statement.setLong(2, fromSequenceNr)
        statement.setLong(3, toSequenceNr)
        statement.setLong(4, max)
        
        val resultSet = statement.executeQuery()
        while (resultSet.next()) {
          val sequenceNr = resultSet.getLong("sequence_nr")
          val payload = deserialize(resultSet.getBytes("payload"))
          val manifest = resultSet.getString("manifest")
          
          val repr = PersistentRepr(
            payload = payload,
            sequenceNr = sequenceNr,
            persistenceId = persistenceId,
            manifest = manifest
          )
          recoveryCallback(repr)
        }
      }
    }
  }
  
  override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
    Future {
      withConnection { connection =>
        val statement = connection.prepareStatement(
          "SELECT MAX(sequence_nr) FROM journal WHERE persistence_id = ?"
        )
        statement.setString(1, persistenceId)
        val resultSet = statement.executeQuery()
        if (resultSet.next()) resultSet.getLong(1) else 0L
      }
    }
  }
  
  // Helper methods
  private def withConnection[T](f: Connection => T): T = {
    val connection = dataSource.getConnection()
    try f(connection)
    finally connection.close()
  }
  
  private def serialize(obj: Any): Array[Byte] = ??? // Implement serialization
  private def deserialize(bytes: Array[Byte]): Any = ??? // Implement deserialization
  private def createDataSource(): DataSource = ??? // Create database connection pool
}

Snapshot Store Plugin Development

SnapshotStore Implementation

Base trait for implementing snapshot store plugins.

/**
 * Base trait for snapshot store plugins
 */
trait SnapshotStore extends Actor with ActorLogging {
  
  /** Plugin API: Load snapshot matching criteria */
  def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
  
  /** Plugin API: Save snapshot with metadata */
  def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
  
  /** Plugin API: Delete specific snapshot */
  def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
  
  /** Plugin API: Delete snapshots matching criteria */
  def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
  
  /** Plugin API: Handle additional plugin-specific messages */
  def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
}

Implementation Example:

import akka.persistence.snapshot.SnapshotStore
import akka.persistence.{SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria}

class DatabaseSnapshotStore extends SnapshotStore {
  
  override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
    Future {
      withConnection { connection =>
        val statement = connection.prepareStatement(
          """SELECT sequence_nr, timestamp, snapshot_data FROM snapshot_store 
             WHERE persistence_id = ? AND sequence_nr <= ? AND timestamp <= ?
             ORDER BY sequence_nr DESC, timestamp DESC LIMIT 1"""
        )
        statement.setString(1, persistenceId)
        statement.setLong(2, criteria.maxSequenceNr)
        statement.setLong(3, criteria.maxTimestamp)
        
        val resultSet = statement.executeQuery()
        if (resultSet.next()) {
          val sequenceNr = resultSet.getLong("sequence_nr")
          val timestamp = resultSet.getLong("timestamp")
          val snapshotData = deserialize(resultSet.getBytes("snapshot_data"))
          
          val metadata = SnapshotMetadata(persistenceId, sequenceNr, timestamp)
          Some(SelectedSnapshot(metadata, snapshotData))
        } else {
          None
        }
      }
    }
  }
  
  override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
    Future {
      withConnection { connection =>
        val statement = connection.prepareStatement(
          "INSERT INTO snapshot_store (persistence_id, sequence_nr, timestamp, snapshot_data) VALUES (?, ?, ?, ?)"
        )
        statement.setString(1, metadata.persistenceId)
        statement.setLong(2, metadata.sequenceNr)
        statement.setLong(3, metadata.timestamp)
        statement.setBytes(4, serialize(snapshot))
        statement.executeUpdate()
      }
    }
  }
  
  override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {
    Future {
      withConnection { connection =>
        val statement = connection.prepareStatement(
          "DELETE FROM snapshot_store WHERE persistence_id = ? AND sequence_nr = ? AND timestamp = ?"
        )
        statement.setString(1, metadata.persistenceId)
        statement.setLong(2, metadata.sequenceNr)
        statement.setLong(3, metadata.timestamp)
        statement.executeUpdate()
      }
    }
  }
  
  override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
    Future {
      withConnection { connection =>
        val statement = connection.prepareStatement(
          """DELETE FROM snapshot_store 
             WHERE persistence_id = ? AND sequence_nr <= ? AND timestamp <= ?"""
        )
        statement.setString(1, persistenceId)
        statement.setLong(2, criteria.maxSequenceNr)
        statement.setLong(3, criteria.maxTimestamp)
        statement.executeUpdate()
      }
    }
  }
}

Java API Plugin Development

Java Snapshot Store

/**
 * Java API for snapshot store plugins
 */
abstract class SnapshotStore extends akka.persistence.snapshot.SnapshotStore {
  
  /** Java API: Load snapshot */
  def doLoadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): CompletionStage[Optional[SelectedSnapshot]]
  
  /** Java API: Save snapshot */
  def doSaveAsync(metadata: SnapshotMetadata, snapshot: Any): CompletionStage[Void]
  
  /** Java API: Delete snapshot */
  def doDeleteAsync(metadata: SnapshotMetadata): CompletionStage[Void]
  
  /** Java API: Delete snapshots */
  def doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): CompletionStage[Void]
}

Plugin Configuration

Journal Plugin Configuration

# Custom journal plugin configuration
my-journal {
  # Plugin implementation class
  class = "com.example.DatabaseJournal"
  
  # Plugin-specific settings
  connection-string = "jdbc:postgresql://localhost/akka_journal"
  username = "akka"
  password = "akka"
  batch-size = 100
  
  # Circuit breaker configuration
  circuit-breaker {
    max-failures = 10
    call-timeout = 10s
    reset-timeout = 30s
  }
  
  # Replay filter configuration
  replay-filter {
    mode = "repair-by-discard-old"
    window-size = 100
    max-old-writers = 10
    debug = false
  }
  
  # Connection pool settings
  connection-pool {
    initial-size = 5
    max-size = 20
    connection-timeout = 10s
  }
}

# Use the custom journal
akka.persistence.journal.plugin = "my-journal"

Snapshot Store Configuration

# Custom snapshot store plugin configuration  
my-snapshot-store {
  class = "com.example.DatabaseSnapshotStore"
  
  # Plugin-specific settings
  connection-string = "jdbc:postgresql://localhost/akka_snapshots"
  username = "akka"
  password = "akka"
  
  # Circuit breaker configuration
  circuit-breaker {
    max-failures = 5
    call-timeout = 10s
    reset-timeout = 30s
  }
}

# Use the custom snapshot store
akka.persistence.snapshot-store.plugin = "my-snapshot-store"

Advanced Plugin Features

Event Adapter Integration

class CustomJournal extends AsyncWriteJournal {
  
  // Event adapters are automatically applied via WriteJournalBase
  override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
    // Messages are already adapted by preparePersistentBatch
    val adaptedMessages = preparePersistentBatch(messages)
    writeToBackend(adaptedMessages)
  }
}

Plugin Metrics and Monitoring

class MonitoredJournal extends AsyncWriteJournal {
  private val writeLatencyHistogram = registry.histogram("journal.write.latency")
  private val writeCounter = registry.counter("journal.write.count")
  private val errorCounter = registry.counter("journal.write.errors")
  
  override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
    val startTime = System.nanoTime()
    
    writeToBackend(messages).andThen {
      case Success(_) =>
        writeLatencyHistogram.update(System.nanoTime() - startTime)
        writeCounter.inc(messages.map(_.size).sum)
      case Failure(_) =>
        errorCounter.inc()
    }
  }
}

Plugin Migration Support

class MigrationJournal extends AsyncWriteJournal {
  private val oldJournal: ActorRef = context.actorOf(Props[OldJournalPlugin])
  private val newJournal: ActorRef = context.actorOf(Props[NewJournalPlugin])
  
  override def asyncReplayMessages(
    persistenceId: String, 
    fromSequenceNr: Long, 
    toSequenceNr: Long, 
    max: Long
  )(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
    // Try new storage first, fallback to old storage
    newJournal.ask(ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId)).flatMap {
      case messages if messages.nonEmpty => Future.successful(())
      case _ => oldJournal.ask(ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId))
    }
  }
}

Testing Plugin Implementations

Journal Plugin Testing

import akka.persistence.journal.JournalSpec

class CustomJournalSpec extends JournalSpec(
  config = ConfigFactory.parseString(
    """
    akka.persistence.journal.plugin = "custom-journal"
    custom-journal {
      class = "com.example.CustomJournal"
      # Test configuration
    }
    """
  )
) {
  "Custom journal" should {
    "pass all journal compliance tests" in {
      // Tests provided by JournalSpec
    }
  }
}

Snapshot Store Testing

import akka.persistence.snapshot.SnapshotStoreSpec

class CustomSnapshotStoreSpec extends SnapshotStoreSpec(
  config = ConfigFactory.parseString(
    """
    akka.persistence.snapshot-store.plugin = "custom-snapshot-store"
    custom-snapshot-store {
      class = "com.example.CustomSnapshotStore"
      # Test configuration
    }
    """
  )
) {
  "Custom snapshot store" should {
    "pass all snapshot store compliance tests" in {
      // Tests provided by SnapshotStoreSpec
    }
  }
}

Plugin Deployment

Packaging as SBT Plugin

// build.sbt
ThisBuild / organization := "com.example"
ThisBuild / scalaVersion := "2.13.10"

lazy val akkaVersion = "2.8.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-persistence" % akkaVersion,
  "com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test
)

// Make plugin discoverable
resourceDirectories in Compile += (sourceDirectory in Compile).value / "resources"

Reference Configuration

# reference.conf - default plugin configuration
my-persistence-plugin {
  journal {
    class = "com.example.MyJournal"
    # Default settings
  }
  
  snapshot-store {
    class = "com.example.MySnapshotStore"
    # Default settings
  }
}

Performance Optimization

Batching Strategies

class BatchingJournal extends AsyncWriteJournal {
  private val batcher = new MessageBatcher(batchSize = 100, flushInterval = 50.millis)
  
  override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
    // Batch multiple writes for better throughput
    batcher.addBatch(messages).map { batches =>
      batches.flatMap(writeBatchToStorage)
    }
  }
}

Connection Management

class PooledJournal extends AsyncWriteJournal {
  private val connectionPool = new HikariConnectionPool(config)
  
  override def postStop(): Unit = {
    connectionPool.close()
    super.postStop()
  }
  
  private def withConnection[T](f: Connection => T): Future[T] = {
    Future {
      val connection = connectionPool.getConnection()
      try f(connection)
      finally connection.close()
    }
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13

docs

at-least-once-delivery.md

durable-state.md

event-adapters.md

index.md

journal-api.md

persistent-actors.md

plugin-development.md

snapshots.md

tile.json