0
# Plugin Development
1
2
Comprehensive guide for developing custom journal and snapshot store plugins for Akka Persistence. This documentation covers the plugin API, implementation patterns, and configuration requirements.
3
4
## Capabilities
5
6
### Plugin Provider Interfaces
7
8
#### PersistencePlugin
9
10
Base interface for all persistence plugins.
11
12
```scala { .api }
13
/**
14
* Base interface for persistence plugins
15
*/
16
trait PersistencePlugin {
17
/** Unique plugin identifier */
18
def pluginId: String
19
}
20
```
21
22
#### PersistencePluginProxy
23
24
Proxy for dynamically selecting persistence plugins.
25
26
```scala { .api }
27
/**
28
* Proxy that delegates to target plugin based on configuration
29
*/
30
class PersistencePluginProxy(targetPluginConfig: Config) extends Actor {
31
/** Target plugin actor reference */
32
def targetPlugin: ActorRef
33
34
/** Start target plugin with given configuration */
35
def startTargetPlugin(): Unit
36
}
37
```
38
39
### Journal Plugin Development
40
41
#### AsyncWriteJournal Implementation
42
43
Complete implementation template for async journal plugins.
44
45
```scala { .api }
46
/**
47
* Base trait for implementing custom journal plugins
48
*/
49
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
50
51
/** Plugin API: Write messages asynchronously */
52
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
53
54
/** Plugin API: Delete messages asynchronously */
55
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
56
57
/** Plugin API: Replay messages for recovery */
58
def asyncReplayMessages(
59
persistenceId: String,
60
fromSequenceNr: Long,
61
toSequenceNr: Long,
62
max: Long
63
)(recoveryCallback: PersistentRepr => Unit): Future[Unit]
64
65
/** Plugin API: Read highest sequence number */
66
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
67
}
68
```
69
70
**Implementation Example:**
71
72
```scala
73
import akka.persistence.journal.AsyncWriteJournal
74
import akka.persistence.{AtomicWrite, PersistentRepr}
75
import java.sql.{Connection, PreparedStatement}
76
import javax.sql.DataSource
77
78
class DatabaseJournal extends AsyncWriteJournal {
79
80
// Configuration and connection management
81
private val dataSource: DataSource = createDataSource()
82
private val batchSize: Int = config.getInt("batch-size")
83
84
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
85
Future {
86
withConnection { connection =>
87
val statement = connection.prepareStatement(
88
"INSERT INTO journal (persistence_id, sequence_nr, payload, manifest) VALUES (?, ?, ?, ?)"
89
)
90
91
messages.map { atomicWrite =>
92
Try {
93
atomicWrite.payload.foreach { repr =>
94
statement.setString(1, repr.persistenceId)
95
statement.setLong(2, repr.sequenceNr)
96
statement.setBytes(3, serialize(repr.payload))
97
statement.setString(4, repr.manifest)
98
statement.addBatch()
99
}
100
statement.executeBatch()
101
statement.clearBatch()
102
}
103
}
104
}
105
}
106
}
107
108
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
109
Future {
110
withConnection { connection =>
111
val statement = connection.prepareStatement(
112
"UPDATE journal SET deleted = true WHERE persistence_id = ? AND sequence_nr <= ?"
113
)
114
statement.setString(1, persistenceId)
115
statement.setLong(2, toSequenceNr)
116
statement.executeUpdate()
117
}
118
}
119
}
120
121
override def asyncReplayMessages(
122
persistenceId: String,
123
fromSequenceNr: Long,
124
toSequenceNr: Long,
125
max: Long
126
)(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
127
Future {
128
withConnection { connection =>
129
val statement = connection.prepareStatement(
130
"""SELECT sequence_nr, payload, manifest FROM journal
131
WHERE persistence_id = ? AND sequence_nr >= ? AND sequence_nr <= ?
132
ORDER BY sequence_nr LIMIT ?"""
133
)
134
statement.setString(1, persistenceId)
135
statement.setLong(2, fromSequenceNr)
136
statement.setLong(3, toSequenceNr)
137
statement.setLong(4, max)
138
139
val resultSet = statement.executeQuery()
140
while (resultSet.next()) {
141
val sequenceNr = resultSet.getLong("sequence_nr")
142
val payload = deserialize(resultSet.getBytes("payload"))
143
val manifest = resultSet.getString("manifest")
144
145
val repr = PersistentRepr(
146
payload = payload,
147
sequenceNr = sequenceNr,
148
persistenceId = persistenceId,
149
manifest = manifest
150
)
151
recoveryCallback(repr)
152
}
153
}
154
}
155
}
156
157
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
158
Future {
159
withConnection { connection =>
160
val statement = connection.prepareStatement(
161
"SELECT MAX(sequence_nr) FROM journal WHERE persistence_id = ?"
162
)
163
statement.setString(1, persistenceId)
164
val resultSet = statement.executeQuery()
165
if (resultSet.next()) resultSet.getLong(1) else 0L
166
}
167
}
168
}
169
170
// Helper methods
171
private def withConnection[T](f: Connection => T): T = {
172
val connection = dataSource.getConnection()
173
try f(connection)
174
finally connection.close()
175
}
176
177
private def serialize(obj: Any): Array[Byte] = ??? // Implement serialization
178
private def deserialize(bytes: Array[Byte]): Any = ??? // Implement deserialization
179
private def createDataSource(): DataSource = ??? // Create database connection pool
180
}
181
```
182
183
### Snapshot Store Plugin Development
184
185
#### SnapshotStore Implementation
186
187
Base trait for implementing snapshot store plugins.
188
189
```scala { .api }
190
/**
191
* Base trait for snapshot store plugins
192
*/
193
trait SnapshotStore extends Actor with ActorLogging {
194
195
/** Plugin API: Load snapshot matching criteria */
196
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
197
198
/** Plugin API: Save snapshot with metadata */
199
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
200
201
/** Plugin API: Delete specific snapshot */
202
def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
203
204
/** Plugin API: Delete snapshots matching criteria */
205
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
206
207
/** Plugin API: Handle additional plugin-specific messages */
208
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
209
}
210
```
211
212
**Implementation Example:**
213
214
```scala
215
import akka.persistence.snapshot.SnapshotStore
216
import akka.persistence.{SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria}
217
218
class DatabaseSnapshotStore extends SnapshotStore {
219
220
override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
221
Future {
222
withConnection { connection =>
223
val statement = connection.prepareStatement(
224
"""SELECT sequence_nr, timestamp, snapshot_data FROM snapshot_store
225
WHERE persistence_id = ? AND sequence_nr <= ? AND timestamp <= ?
226
ORDER BY sequence_nr DESC, timestamp DESC LIMIT 1"""
227
)
228
statement.setString(1, persistenceId)
229
statement.setLong(2, criteria.maxSequenceNr)
230
statement.setLong(3, criteria.maxTimestamp)
231
232
val resultSet = statement.executeQuery()
233
if (resultSet.next()) {
234
val sequenceNr = resultSet.getLong("sequence_nr")
235
val timestamp = resultSet.getLong("timestamp")
236
val snapshotData = deserialize(resultSet.getBytes("snapshot_data"))
237
238
val metadata = SnapshotMetadata(persistenceId, sequenceNr, timestamp)
239
Some(SelectedSnapshot(metadata, snapshotData))
240
} else {
241
None
242
}
243
}
244
}
245
}
246
247
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
248
Future {
249
withConnection { connection =>
250
val statement = connection.prepareStatement(
251
"INSERT INTO snapshot_store (persistence_id, sequence_nr, timestamp, snapshot_data) VALUES (?, ?, ?, ?)"
252
)
253
statement.setString(1, metadata.persistenceId)
254
statement.setLong(2, metadata.sequenceNr)
255
statement.setLong(3, metadata.timestamp)
256
statement.setBytes(4, serialize(snapshot))
257
statement.executeUpdate()
258
}
259
}
260
}
261
262
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {
263
Future {
264
withConnection { connection =>
265
val statement = connection.prepareStatement(
266
"DELETE FROM snapshot_store WHERE persistence_id = ? AND sequence_nr = ? AND timestamp = ?"
267
)
268
statement.setString(1, metadata.persistenceId)
269
statement.setLong(2, metadata.sequenceNr)
270
statement.setLong(3, metadata.timestamp)
271
statement.executeUpdate()
272
}
273
}
274
}
275
276
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
277
Future {
278
withConnection { connection =>
279
val statement = connection.prepareStatement(
280
"""DELETE FROM snapshot_store
281
WHERE persistence_id = ? AND sequence_nr <= ? AND timestamp <= ?"""
282
)
283
statement.setString(1, persistenceId)
284
statement.setLong(2, criteria.maxSequenceNr)
285
statement.setLong(3, criteria.maxTimestamp)
286
statement.executeUpdate()
287
}
288
}
289
}
290
}
291
```
292
293
### Java API Plugin Development
294
295
#### Java Snapshot Store
296
297
```scala { .api }
298
/**
299
* Java API for snapshot store plugins
300
*/
301
abstract class SnapshotStore extends akka.persistence.snapshot.SnapshotStore {
302
303
/** Java API: Load snapshot */
304
def doLoadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): CompletionStage[Optional[SelectedSnapshot]]
305
306
/** Java API: Save snapshot */
307
def doSaveAsync(metadata: SnapshotMetadata, snapshot: Any): CompletionStage[Void]
308
309
/** Java API: Delete snapshot */
310
def doDeleteAsync(metadata: SnapshotMetadata): CompletionStage[Void]
311
312
/** Java API: Delete snapshots */
313
def doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): CompletionStage[Void]
314
}
315
```
316
317
## Plugin Configuration
318
319
### Journal Plugin Configuration
320
321
```hocon
322
# Custom journal plugin configuration
323
my-journal {
324
# Plugin implementation class
325
class = "com.example.DatabaseJournal"
326
327
# Plugin-specific settings
328
connection-string = "jdbc:postgresql://localhost/akka_journal"
329
username = "akka"
330
password = "akka"
331
batch-size = 100
332
333
# Circuit breaker configuration
334
circuit-breaker {
335
max-failures = 10
336
call-timeout = 10s
337
reset-timeout = 30s
338
}
339
340
# Replay filter configuration
341
replay-filter {
342
mode = "repair-by-discard-old"
343
window-size = 100
344
max-old-writers = 10
345
debug = false
346
}
347
348
# Connection pool settings
349
connection-pool {
350
initial-size = 5
351
max-size = 20
352
connection-timeout = 10s
353
}
354
}
355
356
# Use the custom journal
357
akka.persistence.journal.plugin = "my-journal"
358
```
359
360
### Snapshot Store Configuration
361
362
```hocon
363
# Custom snapshot store plugin configuration
364
my-snapshot-store {
365
class = "com.example.DatabaseSnapshotStore"
366
367
# Plugin-specific settings
368
connection-string = "jdbc:postgresql://localhost/akka_snapshots"
369
username = "akka"
370
password = "akka"
371
372
# Circuit breaker configuration
373
circuit-breaker {
374
max-failures = 5
375
call-timeout = 10s
376
reset-timeout = 30s
377
}
378
}
379
380
# Use the custom snapshot store
381
akka.persistence.snapshot-store.plugin = "my-snapshot-store"
382
```
383
384
## Advanced Plugin Features
385
386
### Event Adapter Integration
387
388
```scala
389
class CustomJournal extends AsyncWriteJournal {
390
391
// Event adapters are automatically applied via WriteJournalBase
392
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
393
// Messages are already adapted by preparePersistentBatch
394
val adaptedMessages = preparePersistentBatch(messages)
395
writeToBackend(adaptedMessages)
396
}
397
}
398
```
399
400
### Plugin Metrics and Monitoring
401
402
```scala
403
class MonitoredJournal extends AsyncWriteJournal {
404
private val writeLatencyHistogram = registry.histogram("journal.write.latency")
405
private val writeCounter = registry.counter("journal.write.count")
406
private val errorCounter = registry.counter("journal.write.errors")
407
408
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
409
val startTime = System.nanoTime()
410
411
writeToBackend(messages).andThen {
412
case Success(_) =>
413
writeLatencyHistogram.update(System.nanoTime() - startTime)
414
writeCounter.inc(messages.map(_.size).sum)
415
case Failure(_) =>
416
errorCounter.inc()
417
}
418
}
419
}
420
```
421
422
### Plugin Migration Support
423
424
```scala
425
class MigrationJournal extends AsyncWriteJournal {
426
private val oldJournal: ActorRef = context.actorOf(Props[OldJournalPlugin])
427
private val newJournal: ActorRef = context.actorOf(Props[NewJournalPlugin])
428
429
override def asyncReplayMessages(
430
persistenceId: String,
431
fromSequenceNr: Long,
432
toSequenceNr: Long,
433
max: Long
434
)(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
435
// Try new storage first, fallback to old storage
436
newJournal.ask(ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId)).flatMap {
437
case messages if messages.nonEmpty => Future.successful(())
438
case _ => oldJournal.ask(ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId))
439
}
440
}
441
}
442
```
443
444
## Testing Plugin Implementations
445
446
### Journal Plugin Testing
447
448
```scala
449
import akka.persistence.journal.JournalSpec
450
451
class CustomJournalSpec extends JournalSpec(
452
config = ConfigFactory.parseString(
453
"""
454
akka.persistence.journal.plugin = "custom-journal"
455
custom-journal {
456
class = "com.example.CustomJournal"
457
# Test configuration
458
}
459
"""
460
)
461
) {
462
"Custom journal" should {
463
"pass all journal compliance tests" in {
464
// Tests provided by JournalSpec
465
}
466
}
467
}
468
```
469
470
### Snapshot Store Testing
471
472
```scala
473
import akka.persistence.snapshot.SnapshotStoreSpec
474
475
class CustomSnapshotStoreSpec extends SnapshotStoreSpec(
476
config = ConfigFactory.parseString(
477
"""
478
akka.persistence.snapshot-store.plugin = "custom-snapshot-store"
479
custom-snapshot-store {
480
class = "com.example.CustomSnapshotStore"
481
# Test configuration
482
}
483
"""
484
)
485
) {
486
"Custom snapshot store" should {
487
"pass all snapshot store compliance tests" in {
488
// Tests provided by SnapshotStoreSpec
489
}
490
}
491
}
492
```
493
494
## Plugin Deployment
495
496
### Packaging as SBT Plugin
497
498
```scala
499
// build.sbt
500
ThisBuild / organization := "com.example"
501
ThisBuild / scalaVersion := "2.13.10"
502
503
lazy val akkaVersion = "2.8.8"
504
505
libraryDependencies ++= Seq(
506
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
507
"com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test
508
)
509
510
// Make plugin discoverable
511
resourceDirectories in Compile += (sourceDirectory in Compile).value / "resources"
512
```
513
514
### Reference Configuration
515
516
```hocon
517
# reference.conf - default plugin configuration
518
my-persistence-plugin {
519
journal {
520
class = "com.example.MyJournal"
521
# Default settings
522
}
523
524
snapshot-store {
525
class = "com.example.MySnapshotStore"
526
# Default settings
527
}
528
}
529
```
530
531
## Performance Optimization
532
533
### Batching Strategies
534
535
```scala
536
class BatchingJournal extends AsyncWriteJournal {
537
private val batcher = new MessageBatcher(batchSize = 100, flushInterval = 50.millis)
538
539
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
540
// Batch multiple writes for better throughput
541
batcher.addBatch(messages).map { batches =>
542
batches.flatMap(writeBatchToStorage)
543
}
544
}
545
}
546
```
547
548
### Connection Management
549
550
```scala
551
class PooledJournal extends AsyncWriteJournal {
552
private val connectionPool = new HikariConnectionPool(config)
553
554
override def postStop(): Unit = {
555
connectionPool.close()
556
super.postStop()
557
}
558
559
private def withConnection[T](f: Connection => T): Future[T] = {
560
Future {
561
val connection = connectionPool.getConnection()
562
try f(connection)
563
finally connection.close()
564
}
565
}
566
}
567
```