Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.
—
Comprehensive guide for developing custom journal and snapshot store plugins for Akka Persistence. This documentation covers the plugin API, implementation patterns, and configuration requirements.
Base interface for all persistence plugins.
/**
* Base interface for persistence plugins
*/
trait PersistencePlugin {
/** Unique plugin identifier */
def pluginId: String
}Proxy for dynamically selecting persistence plugins.
/**
* Proxy that delegates to target plugin based on configuration
*/
class PersistencePluginProxy(targetPluginConfig: Config) extends Actor {
/** Target plugin actor reference */
def targetPlugin: ActorRef
/** Start target plugin with given configuration */
def startTargetPlugin(): Unit
}Complete implementation template for async journal plugins.
/**
* Base trait for implementing custom journal plugins
*/
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
/** Plugin API: Write messages asynchronously */
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
/** Plugin API: Delete messages asynchronously */
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
/** Plugin API: Replay messages for recovery */
def asyncReplayMessages(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long
)(recoveryCallback: PersistentRepr => Unit): Future[Unit]
/** Plugin API: Read highest sequence number */
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
}Implementation Example:
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.{AtomicWrite, PersistentRepr}
import java.sql.{Connection, PreparedStatement}
import javax.sql.DataSource
class DatabaseJournal extends AsyncWriteJournal {
// Configuration and connection management
private val dataSource: DataSource = createDataSource()
private val batchSize: Int = config.getInt("batch-size")
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
Future {
withConnection { connection =>
val statement = connection.prepareStatement(
"INSERT INTO journal (persistence_id, sequence_nr, payload, manifest) VALUES (?, ?, ?, ?)"
)
messages.map { atomicWrite =>
Try {
atomicWrite.payload.foreach { repr =>
statement.setString(1, repr.persistenceId)
statement.setLong(2, repr.sequenceNr)
statement.setBytes(3, serialize(repr.payload))
statement.setString(4, repr.manifest)
statement.addBatch()
}
statement.executeBatch()
statement.clearBatch()
}
}
}
}
}
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
Future {
withConnection { connection =>
val statement = connection.prepareStatement(
"UPDATE journal SET deleted = true WHERE persistence_id = ? AND sequence_nr <= ?"
)
statement.setString(1, persistenceId)
statement.setLong(2, toSequenceNr)
statement.executeUpdate()
}
}
}
override def asyncReplayMessages(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long
)(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
Future {
withConnection { connection =>
val statement = connection.prepareStatement(
"""SELECT sequence_nr, payload, manifest FROM journal
WHERE persistence_id = ? AND sequence_nr >= ? AND sequence_nr <= ?
ORDER BY sequence_nr LIMIT ?"""
)
statement.setString(1, persistenceId)
statement.setLong(2, fromSequenceNr)
statement.setLong(3, toSequenceNr)
statement.setLong(4, max)
val resultSet = statement.executeQuery()
while (resultSet.next()) {
val sequenceNr = resultSet.getLong("sequence_nr")
val payload = deserialize(resultSet.getBytes("payload"))
val manifest = resultSet.getString("manifest")
val repr = PersistentRepr(
payload = payload,
sequenceNr = sequenceNr,
persistenceId = persistenceId,
manifest = manifest
)
recoveryCallback(repr)
}
}
}
}
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
Future {
withConnection { connection =>
val statement = connection.prepareStatement(
"SELECT MAX(sequence_nr) FROM journal WHERE persistence_id = ?"
)
statement.setString(1, persistenceId)
val resultSet = statement.executeQuery()
if (resultSet.next()) resultSet.getLong(1) else 0L
}
}
}
// Helper methods
private def withConnection[T](f: Connection => T): T = {
val connection = dataSource.getConnection()
try f(connection)
finally connection.close()
}
private def serialize(obj: Any): Array[Byte] = ??? // Implement serialization
private def deserialize(bytes: Array[Byte]): Any = ??? // Implement deserialization
private def createDataSource(): DataSource = ??? // Create database connection pool
}Base trait for implementing snapshot store plugins.
/**
* Base trait for snapshot store plugins
*/
trait SnapshotStore extends Actor with ActorLogging {
/** Plugin API: Load snapshot matching criteria */
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
/** Plugin API: Save snapshot with metadata */
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
/** Plugin API: Delete specific snapshot */
def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
/** Plugin API: Delete snapshots matching criteria */
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
/** Plugin API: Handle additional plugin-specific messages */
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
}Implementation Example:
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.{SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria}
class DatabaseSnapshotStore extends SnapshotStore {
override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
Future {
withConnection { connection =>
val statement = connection.prepareStatement(
"""SELECT sequence_nr, timestamp, snapshot_data FROM snapshot_store
WHERE persistence_id = ? AND sequence_nr <= ? AND timestamp <= ?
ORDER BY sequence_nr DESC, timestamp DESC LIMIT 1"""
)
statement.setString(1, persistenceId)
statement.setLong(2, criteria.maxSequenceNr)
statement.setLong(3, criteria.maxTimestamp)
val resultSet = statement.executeQuery()
if (resultSet.next()) {
val sequenceNr = resultSet.getLong("sequence_nr")
val timestamp = resultSet.getLong("timestamp")
val snapshotData = deserialize(resultSet.getBytes("snapshot_data"))
val metadata = SnapshotMetadata(persistenceId, sequenceNr, timestamp)
Some(SelectedSnapshot(metadata, snapshotData))
} else {
None
}
}
}
}
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
Future {
withConnection { connection =>
val statement = connection.prepareStatement(
"INSERT INTO snapshot_store (persistence_id, sequence_nr, timestamp, snapshot_data) VALUES (?, ?, ?, ?)"
)
statement.setString(1, metadata.persistenceId)
statement.setLong(2, metadata.sequenceNr)
statement.setLong(3, metadata.timestamp)
statement.setBytes(4, serialize(snapshot))
statement.executeUpdate()
}
}
}
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {
Future {
withConnection { connection =>
val statement = connection.prepareStatement(
"DELETE FROM snapshot_store WHERE persistence_id = ? AND sequence_nr = ? AND timestamp = ?"
)
statement.setString(1, metadata.persistenceId)
statement.setLong(2, metadata.sequenceNr)
statement.setLong(3, metadata.timestamp)
statement.executeUpdate()
}
}
}
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
Future {
withConnection { connection =>
val statement = connection.prepareStatement(
"""DELETE FROM snapshot_store
WHERE persistence_id = ? AND sequence_nr <= ? AND timestamp <= ?"""
)
statement.setString(1, persistenceId)
statement.setLong(2, criteria.maxSequenceNr)
statement.setLong(3, criteria.maxTimestamp)
statement.executeUpdate()
}
}
}
}/**
* Java API for snapshot store plugins
*/
abstract class SnapshotStore extends akka.persistence.snapshot.SnapshotStore {
/** Java API: Load snapshot */
def doLoadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): CompletionStage[Optional[SelectedSnapshot]]
/** Java API: Save snapshot */
def doSaveAsync(metadata: SnapshotMetadata, snapshot: Any): CompletionStage[Void]
/** Java API: Delete snapshot */
def doDeleteAsync(metadata: SnapshotMetadata): CompletionStage[Void]
/** Java API: Delete snapshots */
def doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): CompletionStage[Void]
}# Custom journal plugin configuration
my-journal {
# Plugin implementation class
class = "com.example.DatabaseJournal"
# Plugin-specific settings
connection-string = "jdbc:postgresql://localhost/akka_journal"
username = "akka"
password = "akka"
batch-size = 100
# Circuit breaker configuration
circuit-breaker {
max-failures = 10
call-timeout = 10s
reset-timeout = 30s
}
# Replay filter configuration
replay-filter {
mode = "repair-by-discard-old"
window-size = 100
max-old-writers = 10
debug = false
}
# Connection pool settings
connection-pool {
initial-size = 5
max-size = 20
connection-timeout = 10s
}
}
# Use the custom journal
akka.persistence.journal.plugin = "my-journal"# Custom snapshot store plugin configuration
my-snapshot-store {
class = "com.example.DatabaseSnapshotStore"
# Plugin-specific settings
connection-string = "jdbc:postgresql://localhost/akka_snapshots"
username = "akka"
password = "akka"
# Circuit breaker configuration
circuit-breaker {
max-failures = 5
call-timeout = 10s
reset-timeout = 30s
}
}
# Use the custom snapshot store
akka.persistence.snapshot-store.plugin = "my-snapshot-store"class CustomJournal extends AsyncWriteJournal {
// Event adapters are automatically applied via WriteJournalBase
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
// Messages are already adapted by preparePersistentBatch
val adaptedMessages = preparePersistentBatch(messages)
writeToBackend(adaptedMessages)
}
}class MonitoredJournal extends AsyncWriteJournal {
private val writeLatencyHistogram = registry.histogram("journal.write.latency")
private val writeCounter = registry.counter("journal.write.count")
private val errorCounter = registry.counter("journal.write.errors")
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
val startTime = System.nanoTime()
writeToBackend(messages).andThen {
case Success(_) =>
writeLatencyHistogram.update(System.nanoTime() - startTime)
writeCounter.inc(messages.map(_.size).sum)
case Failure(_) =>
errorCounter.inc()
}
}
}class MigrationJournal extends AsyncWriteJournal {
private val oldJournal: ActorRef = context.actorOf(Props[OldJournalPlugin])
private val newJournal: ActorRef = context.actorOf(Props[NewJournalPlugin])
override def asyncReplayMessages(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long
)(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
// Try new storage first, fallback to old storage
newJournal.ask(ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId)).flatMap {
case messages if messages.nonEmpty => Future.successful(())
case _ => oldJournal.ask(ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId))
}
}
}import akka.persistence.journal.JournalSpec
class CustomJournalSpec extends JournalSpec(
config = ConfigFactory.parseString(
"""
akka.persistence.journal.plugin = "custom-journal"
custom-journal {
class = "com.example.CustomJournal"
# Test configuration
}
"""
)
) {
"Custom journal" should {
"pass all journal compliance tests" in {
// Tests provided by JournalSpec
}
}
}import akka.persistence.snapshot.SnapshotStoreSpec
class CustomSnapshotStoreSpec extends SnapshotStoreSpec(
config = ConfigFactory.parseString(
"""
akka.persistence.snapshot-store.plugin = "custom-snapshot-store"
custom-snapshot-store {
class = "com.example.CustomSnapshotStore"
# Test configuration
}
"""
)
) {
"Custom snapshot store" should {
"pass all snapshot store compliance tests" in {
// Tests provided by SnapshotStoreSpec
}
}
}// build.sbt
ThisBuild / organization := "com.example"
ThisBuild / scalaVersion := "2.13.10"
lazy val akkaVersion = "2.8.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test
)
// Make plugin discoverable
resourceDirectories in Compile += (sourceDirectory in Compile).value / "resources"# reference.conf - default plugin configuration
my-persistence-plugin {
journal {
class = "com.example.MyJournal"
# Default settings
}
snapshot-store {
class = "com.example.MySnapshotStore"
# Default settings
}
}class BatchingJournal extends AsyncWriteJournal {
private val batcher = new MessageBatcher(batchSize = 100, flushInterval = 50.millis)
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
// Batch multiple writes for better throughput
batcher.addBatch(messages).map { batches =>
batches.flatMap(writeBatchToStorage)
}
}
}class PooledJournal extends AsyncWriteJournal {
private val connectionPool = new HikariConnectionPool(config)
override def postStop(): Unit = {
connectionPool.close()
super.postStop()
}
private def withConnection[T](f: Connection => T): Future[T] = {
Future {
val connection = connectionPool.getConnection()
try f(connection)
finally connection.close()
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13