0
# Journal API
1
2
Journal plugin development interfaces for implementing custom persistence backends. These APIs enable developers to create journal plugins that store and retrieve persistent messages with various storage systems.
3
4
## Capabilities
5
6
### AsyncWriteJournal
7
8
Base trait for implementing asynchronous, non-blocking journal plugins.
9
10
```scala { .api }
11
/**
12
* Abstract journal optimized for asynchronous, non-blocking writes
13
*/
14
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
15
16
/**
17
* Plugin API: Asynchronously write messages to the journal.
18
* The returned future must be completed when all AtomicWrite operations are finished.
19
* The returned sequence must have the same size as the input sequence and must contain results
20
* for each AtomicWrite in the same order.
21
*/
22
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
23
24
/**
25
* Plugin API: Asynchronously delete messages up to the given sequence number.
26
* If `permanent` is false, messages are marked as deleted but not physically removed.
27
*/
28
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
29
}
30
```
31
32
**Usage Examples:**
33
34
```scala
35
import akka.persistence.journal.AsyncWriteJournal
36
import akka.persistence.{AtomicWrite, PersistentRepr}
37
import scala.concurrent.Future
38
import scala.util.{Try, Success, Failure}
39
40
class CustomJournal extends AsyncWriteJournal {
41
42
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
43
// Custom storage implementation
44
Future.traverse(messages) { atomicWrite =>
45
val persistenceId = atomicWrite.persistenceId
46
val batch = atomicWrite.payload
47
48
// Store messages in your backend
49
storeMessages(persistenceId, batch).map(_ => Success(())).recover {
50
case ex => Failure(ex)
51
}
52
}
53
}
54
55
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
56
// Mark messages as deleted in your backend
57
markMessagesDeleted(persistenceId, toSequenceNr)
58
}
59
60
override def asyncReplayMessages(
61
persistenceId: String,
62
fromSequenceNr: Long,
63
toSequenceNr: Long,
64
max: Long
65
)(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {
66
// Replay messages from your backend
67
replayFromStore(persistenceId, fromSequenceNr, toSequenceNr, max) { message =>
68
recoveryCallback(message)
69
}
70
}
71
72
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
73
// Return highest sequence number from your backend
74
getHighestSequenceNr(persistenceId)
75
}
76
77
// Custom implementation methods
78
private def storeMessages(persistenceId: String, messages: immutable.Seq[PersistentRepr]): Future[Unit] = ???
79
private def markMessagesDeleted(persistenceId: String, toSequenceNr: Long): Future[Unit] = ???
80
private def replayFromStore(persistenceId: String, from: Long, to: Long, max: Long)(callback: PersistentRepr => Unit): Future[Unit] = ???
81
private def getHighestSequenceNr(persistenceId: String): Future[Long] = ???
82
}
83
```
84
85
### AsyncRecovery
86
87
Interface for asynchronous message replay and sequence number recovery.
88
89
```scala { .api }
90
/**
91
* Asynchronous message replay and sequence number recovery interface
92
*/
93
trait AsyncRecovery {
94
95
/**
96
* Plugin API: Asynchronously replay persistent messages by calling replayCallback.
97
* Must complete when all messages matching sequence number bounds have been replayed.
98
*/
99
def asyncReplayMessages(
100
persistenceId: String,
101
fromSequenceNr: Long,
102
toSequenceNr: Long,
103
max: Long
104
)(recoveryCallback: PersistentRepr => Unit): Future[Unit]
105
106
/**
107
* Plugin API: Asynchronously read the highest stored sequence number.
108
* Used by persistent actors to determine starting point for new events.
109
*/
110
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
111
}
112
```
113
114
### WriteJournalBase
115
116
Base functionality for journal implementations including event adapter integration.
117
118
```scala { .api }
119
/**
120
* Base trait providing common journal functionality
121
*/
122
private[akka] trait WriteJournalBase {
123
this: Actor =>
124
125
/** Prepare batch of persistent messages for storage */
126
protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[AtomicWrite]
127
}
128
```
129
130
### SyncWriteJournal (Deprecated)
131
132
Synchronous journal interface - deprecated in favor of AsyncWriteJournal.
133
134
```scala { .api }
135
/**
136
* Synchronous write journal - DEPRECATED
137
* Use AsyncWriteJournal instead for better performance
138
*/
139
@deprecated("Use AsyncWriteJournal instead", "2.3.4")
140
trait SyncWriteJournal extends Actor with WriteJournalBase with SyncRecovery {
141
142
/** Synchronously write messages - blocks calling thread */
143
def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]]
144
145
/** Synchronously delete messages - blocks calling thread */
146
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit
147
}
148
```
149
150
## Journal Message Types
151
152
### AtomicWrite
153
154
Container for a batch of persistent messages that must be written atomically.
155
156
```scala { .api }
157
/**
158
* Atomic write operation containing messages for single persistence ID
159
*/
160
case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) {
161
/** Persistence ID from first message */
162
def persistenceId: String = payload.head.persistenceId
163
164
/** Lowest sequence number in batch */
165
def lowestSequenceNr: Long = payload.head.sequenceNr
166
167
/** Highest sequence number in batch */
168
def highestSequenceNr: Long = payload.last.sequenceNr
169
170
/** Number of messages in batch */
171
def size: Int = payload.size
172
}
173
```
174
175
### WriteMessages
176
177
Journal protocol message for writing persistent messages.
178
179
```scala { .api }
180
/**
181
* Request to write messages to journal
182
*/
183
case class WriteMessages(
184
messages: immutable.Seq[PersistentEnvelope],
185
persistentActor: ActorRef,
186
actorInstanceId: Int
187
)
188
```
189
190
### ReplayMessages
191
192
Journal protocol message for replaying messages during recovery.
193
194
```scala { .api }
195
/**
196
* Request to replay messages from journal
197
*/
198
case class ReplayMessages(
199
fromSequenceNr: Long,
200
toSequenceNr: Long,
201
max: Long,
202
persistenceId: String,
203
persistentActor: ActorRef
204
)
205
```
206
207
## Error Handling
208
209
### Journal Failures
210
211
```scala { .api }
212
/** Exception indicating journal operation failure */
213
case class JournalFailureException(cause: Throwable) extends RuntimeException(cause)
214
215
/** Response indicating write message failure */
216
case class WriteMessageFailure(cause: Throwable, sequenceNr: Long)
217
218
/** Response indicating replay message failure */
219
case class ReplayMessagesFailure(cause: Throwable)
220
```
221
222
## Plugin Configuration
223
224
Journal plugins are configured in application.conf:
225
226
```hocon
227
akka.persistence.journal {
228
plugin = "custom-journal"
229
230
# Custom journal configuration
231
custom-journal {
232
class = "com.example.CustomJournal"
233
234
# Plugin-specific settings
235
connection-string = "jdbc:postgresql://localhost/events"
236
batch-size = 100
237
238
# Circuit breaker settings
239
circuit-breaker {
240
max-failures = 10
241
call-timeout = 10s
242
reset-timeout = 30s
243
}
244
245
# Replay filter settings
246
replay-filter {
247
mode = "repair-by-discard-old"
248
window-size = 100
249
max-old-writers = 10
250
debug = false
251
}
252
}
253
}
254
```
255
256
## Advanced Features
257
258
### Circuit Breaker Integration
259
260
AsyncWriteJournal includes built-in circuit breaker protection:
261
262
```scala
263
// Circuit breaker configuration
264
circuit-breaker {
265
max-failures = 10 # Number of failures before opening circuit
266
call-timeout = 10s # Timeout for journal operations
267
reset-timeout = 30s # Time before attempting to close circuit
268
}
269
```
270
271
### Replay Filter
272
273
Filters out corrupt or duplicate messages during replay:
274
275
```scala
276
// Replay filter modes
277
replay-filter {
278
mode = "repair-by-discard-old" # repair-by-discard-old, fail, warn, off
279
window-size = 100 # Size of duplicate detection window
280
max-old-writers = 10 # Maximum old writers to track
281
}
282
```
283
284
### Performance Optimization
285
286
```scala
287
class OptimizedJournal extends AsyncWriteJournal {
288
289
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
290
// Batch multiple AtomicWrites for better throughput
291
val batches = messages.grouped(batchSize).toSeq
292
293
Future.traverse(batches) { batch =>
294
// Write batch to storage with single I/O operation
295
writeBatchToStorage(batch)
296
}.map(_.flatten)
297
}
298
299
// Use connection pooling and prepared statements
300
private def writeBatchToStorage(batch: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
301
connectionPool.withConnection { connection =>
302
val preparedStatement = connection.prepareStatement(insertSQL)
303
// Batch insert for better performance
304
batch.foreach { atomicWrite =>
305
atomicWrite.payload.foreach { repr =>
306
preparedStatement.setString(1, repr.persistenceId)
307
preparedStatement.setLong(2, repr.sequenceNr)
308
preparedStatement.setBytes(3, serialize(repr))
309
preparedStatement.addBatch()
310
}
311
}
312
preparedStatement.executeBatch()
313
}
314
}
315
}
316
```
317
318
## Testing Journal Plugins
319
320
```scala
321
import akka.persistence.journal.JournalSpec
322
323
class CustomJournalSpec extends JournalSpec(config = ConfigFactory.parseString("""
324
akka.persistence.journal.plugin = "custom-journal"
325
custom-journal {
326
class = "com.example.CustomJournal"
327
# Test configuration
328
}
329
""")) {
330
331
"Custom journal" should {
332
"write and replay messages" in {
333
// Test cases provided by JournalSpec
334
}
335
336
"handle concurrent writes" in {
337
// Custom test cases
338
}
339
}
340
}
341
```
342
343
## Migration from Sync to Async
344
345
For migrating from SyncWriteJournal to AsyncWriteJournal:
346
347
```scala
348
// Old sync implementation
349
class OldSyncJournal extends SyncWriteJournal {
350
def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = {
351
// Blocking operations
352
messages.map(syncWrite)
353
}
354
}
355
356
// New async implementation
357
class NewAsyncJournal extends AsyncWriteJournal {
358
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
359
// Non-blocking operations
360
Future.traverse(messages)(asyncWrite)
361
}
362
}
363
```