0
# Journal Implementations
1
2
Akka Persistence Query provides concrete read journal implementations for specific storage backends. These implementations demonstrate how to build read journals and provide working examples for common use cases.
3
4
## Capabilities
5
6
### LevelDB Read Journal (Deprecated)
7
8
Reference implementation using LevelDB storage backend. Note that this implementation is deprecated as of Akka 2.6.15.
9
10
```scala { .api }
11
/**
12
* LevelDB read journal implementation.
13
* @deprecated Use another journal implementation since 2.6.15
14
*/
15
class LeveldbReadJournal extends ReadJournal
16
with PersistenceIdsQuery
17
with CurrentPersistenceIdsQuery
18
with EventsByPersistenceIdQuery
19
with CurrentEventsByPersistenceIdQuery
20
with EventsByTagQuery
21
with CurrentEventsByTagQuery {
22
23
def persistenceIds(): Source[String, NotUsed]
24
def currentPersistenceIds(): Source[String, NotUsed]
25
26
def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Source[EventEnvelope, NotUsed]
27
def currentEventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Source[EventEnvelope, NotUsed]
28
29
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
30
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
31
}
32
```
33
34
### LevelDB Constants and Factory
35
36
Factory methods and constants for LevelDB read journal.
37
38
```scala { .api }
39
/**
40
* LevelDB read journal companion object with plugin identifier.
41
* @deprecated Use another journal implementation since 2.6.15
42
*/
43
object LeveldbReadJournal {
44
/** Plugin identifier for configuration */
45
val Identifier = "akka.persistence.query.journal.leveldb"
46
}
47
```
48
49
**Usage Examples:**
50
51
```scala
52
import akka.persistence.query.PersistenceQuery
53
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
54
55
// Note: LevelDB implementation is deprecated
56
val readJournal = PersistenceQuery(system)
57
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
58
59
// Query all persistence IDs
60
readJournal
61
.persistenceIds()
62
.take(10)
63
.runForeach { persistenceId =>
64
println(s"Found persistence ID: $persistenceId")
65
}
66
67
// Query events by persistence ID
68
readJournal
69
.eventsByPersistenceId("user-123", 0L, Long.MaxValue)
70
.runForeach { envelope =>
71
println(s"Event: ${envelope.event} at sequence ${envelope.sequenceNr}")
72
}
73
74
// Query events by tag
75
readJournal
76
.eventsByTag("user-events", NoOffset)
77
.runForeach { envelope =>
78
println(s"Tagged event: ${envelope.event} from ${envelope.persistenceId}")
79
}
80
81
// Query current (finite) events
82
readJournal
83
.currentEventsByTag("batch-events", NoOffset)
84
.runForeach { envelope =>
85
processBatchEvent(envelope.event)
86
}
87
.onComplete {
88
case Success(_) => println("Batch processing complete")
89
case Failure(ex) => println(s"Batch processing failed: $ex")
90
}
91
```
92
93
### LevelDB Read Journal Provider
94
95
Provider implementation for LevelDB read journal plugin.
96
97
```scala { .api }
98
/**
99
* LevelDB read journal provider implementation.
100
* @deprecated Use another journal implementation since 2.6.15
101
*/
102
class LeveldbReadJournalProvider extends ReadJournalProvider {
103
def scaladslReadJournal(): scaladsl.ReadJournal
104
def javadslReadJournal(): javadsl.ReadJournal
105
}
106
```
107
108
### Events by Slice Firehose Query
109
110
Advanced implementation providing slice-based querying with multiple query capabilities.
111
112
```scala { .api }
113
/**
114
* Firehose implementation for slice-based event queries.
115
* Provides high-performance event streaming with slice-based distribution.
116
*/
117
class EventsBySliceFirehoseQuery extends ReadJournal
118
with EventsBySliceQuery
119
with EventsBySliceStartingFromSnapshotsQuery
120
with EventTimestampQuery
121
with LoadEventQuery {
122
123
// EventsBySliceQuery methods
124
def eventsBySlices[Event](entityType: String, minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed]
125
def sliceForPersistenceId(persistenceId: String): Int
126
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
127
128
// EventsBySliceStartingFromSnapshotsQuery methods
129
def eventsBySlicesStartingFromSnapshots[Snapshot, Event](
130
entityType: String,
131
minSlice: Int,
132
maxSlice: Int,
133
offset: Offset,
134
transformSnapshot: Snapshot => Event
135
): Source[EventEnvelope[Event], NotUsed]
136
137
// EventTimestampQuery methods
138
def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]]
139
140
// LoadEventQuery methods
141
def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]]
142
}
143
```
144
145
### Firehose Constants and Factory
146
147
Constants and factory methods for the firehose implementation.
148
149
```scala { .api }
150
/**
151
* Firehose query companion object with plugin identifier.
152
*/
153
object EventsBySliceFirehoseQuery {
154
/** Plugin identifier for configuration */
155
val Identifier = "akka.persistence.query.events-by-slice-firehose"
156
}
157
```
158
159
**Usage Examples:**
160
161
```scala
162
import akka.persistence.query.PersistenceQuery
163
import akka.persistence.query.typed.scaladsl.EventsBySliceFirehoseQuery
164
165
// Get firehose read journal
166
val firehoseJournal = PersistenceQuery(system)
167
.readJournalFor[EventsBySliceFirehoseQuery](EventsBySliceFirehoseQuery.Identifier)
168
169
// Query events by slice range
170
firehoseJournal
171
.eventsBySlices[DomainEvent]("User", 0, 127, offset)
172
.runForeach { envelope =>
173
println(s"Event from slice ${envelope.slice}: ${envelope.event}")
174
println(s"Entity type: ${envelope.entityType}")
175
println(s"Tags: ${envelope.tags}")
176
}
177
178
// Distribute processing across slices
179
val sliceRanges = firehoseJournal.sliceRanges(8)
180
sliceRanges.zipWithIndex.foreach { case (range, processorId) =>
181
println(s"Starting processor $processorId for slices ${range.start}-${range.end}")
182
183
firehoseJournal
184
.eventsBySlices[DomainEvent]("Order", range.start, range.end, offset)
185
.runForeach { envelope =>
186
processInProcessor(processorId, envelope)
187
}
188
}
189
190
// Load specific events on demand
191
firehoseJournal
192
.loadEnvelope[UserEvent]("user-123", 42L)
193
.foreach { envelope =>
194
println(s"Loaded event: ${envelope.event}")
195
}
196
197
// Query event timestamps
198
firehoseJournal
199
.timestampOf("user-123", 42L)
200
.foreach {
201
case Some(timestamp) => println(s"Event timestamp: $timestamp")
202
case None => println("Event not found")
203
}
204
205
// Query with snapshot integration
206
firehoseJournal
207
.eventsBySlicesStartingFromSnapshots[UserSnapshot, UserEvent](
208
entityType = "User",
209
minSlice = 0,
210
maxSlice = 255,
211
offset = TimestampOffset.Zero,
212
transformSnapshot = snapshot => UserSnapshotRestored(snapshot.userId, snapshot.state)
213
)
214
.runForeach { envelope =>
215
envelope.event match {
216
case UserSnapshotRestored(userId, state) =>
217
println(s"Restored snapshot for $userId")
218
initializeUserState(userId, state)
219
case regularEvent =>
220
processUserEvent(regularEvent)
221
}
222
}
223
```
224
225
### Firehose Read Journal Provider
226
227
Provider implementation for the firehose read journal.
228
229
```scala { .api }
230
/**
231
* Provider for EventsBySliceFirehose query implementation.
232
*/
233
class EventsBySliceFirehoseReadJournalProvider extends ReadJournalProvider {
234
def scaladslReadJournal(): scaladsl.ReadJournal
235
def javadslReadJournal(): javadsl.ReadJournal
236
}
237
```
238
239
## Configuration
240
241
### LevelDB Configuration
242
243
Example configuration for LevelDB read journal (deprecated):
244
245
```hocon
246
# LevelDB read journal configuration (deprecated)
247
akka.persistence.query.journal.leveldb {
248
# Implementation class
249
class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
250
251
# Reference to the write journal plugin
252
write-plugin = "akka.persistence.journal.leveldb"
253
254
# Directory where journal files are stored
255
dir = "target/journal"
256
257
# Maximum number of events to buffer
258
max-buffer-size = 100
259
260
# Refresh interval for live queries
261
refresh-interval = 1s
262
}
263
```
264
265
### Firehose Configuration
266
267
Example configuration for firehose implementation:
268
269
```hocon
270
# Firehose read journal configuration
271
akka.persistence.query.events-by-slice-firehose {
272
# Implementation class
273
class = "akka.persistence.query.typed.EventsBySliceFirehoseReadJournalProvider"
274
275
# Number of slices for horizontal partitioning
276
number-of-slices = 1024
277
278
# Batch size for event retrieval
279
batch-size = 1000
280
281
# Refresh interval for live queries
282
refresh-interval = 500ms
283
284
# Event loading configuration
285
event-loading {
286
# Timeout for loading individual events
287
timeout = 10s
288
289
# Parallelism for concurrent event loading
290
parallelism = 4
291
}
292
}
293
```
294
295
## Implementation Patterns
296
297
### Custom Read Journal Implementation
298
299
Example of implementing a custom read journal:
300
301
```scala
302
class CustomReadJournal(system: ExtendedActorSystem, config: Config)
303
extends ReadJournal
304
with EventsByPersistenceIdQuery
305
with EventsByTagQuery
306
with PersistenceIdsQuery {
307
308
private val backend = new CustomJournalBackend(config)
309
310
override def eventsByPersistenceId(
311
persistenceId: String,
312
fromSequenceNr: Long,
313
toSequenceNr: Long
314
): Source[EventEnvelope, NotUsed] = {
315
Source
316
.unfoldAsync(fromSequenceNr) { seqNr =>
317
if (seqNr > toSequenceNr) {
318
Future.successful(None)
319
} else {
320
backend.loadEvent(persistenceId, seqNr).map {
321
case Some(event) =>
322
val envelope = EventEnvelope(
323
offset = Sequence(seqNr),
324
persistenceId = persistenceId,
325
sequenceNr = seqNr,
326
event = event,
327
timestamp = System.currentTimeMillis()
328
)
329
Some((seqNr + 1, envelope))
330
case None =>
331
None
332
}
333
}
334
}
335
}
336
337
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
338
val startOffset = offset match {
339
case Sequence(value) => value
340
case NoOffset => 0L
341
case _ => throw new IllegalArgumentException(s"Unsupported offset type: $offset")
342
}
343
344
Source
345
.unfoldAsync(startOffset) { currentOffset =>
346
backend.loadEventsByTag(tag, currentOffset, batchSize = 100).map { events =>
347
if (events.nonEmpty) {
348
val lastOffset = events.last.sequenceNr
349
Some((lastOffset + 1, events))
350
} else {
351
// No more events, but this is a live query so we continue
352
Thread.sleep(1000) // Simple polling - real implementation would use more sophisticated approach
353
Some((currentOffset, List.empty))
354
}
355
}
356
}
357
.mapConcat(identity)
358
}
359
360
override def persistenceIds(): Source[String, NotUsed] = {
361
Source
362
.unfoldAsync(Option.empty[String]) { afterId =>
363
backend.loadPersistenceIds(afterId, limit = 100).map { ids =>
364
if (ids.nonEmpty) {
365
Some((ids.lastOption, ids))
366
} else {
367
// No more IDs, but this is a live query
368
Thread.sleep(1000)
369
Some((afterId, List.empty))
370
}
371
}
372
}
373
.mapConcat(identity)
374
}
375
}
376
377
// Custom provider
378
class CustomReadJournalProvider extends ReadJournalProvider {
379
override def scaladslReadJournal(): scaladsl.ReadJournal = {
380
new CustomReadJournal(system, config)
381
}
382
383
override def javadslReadJournal(): javadsl.ReadJournal = {
384
new CustomJavaReadJournal(scaladslReadJournal())
385
}
386
}
387
```
388
389
### Plugin Registration
390
391
Register custom journal implementation:
392
393
```hocon
394
# Custom read journal plugin
395
my-custom-journal {
396
class = "com.example.CustomReadJournalProvider"
397
398
# Custom configuration
399
connection-string = "jdbc:postgresql://localhost/mydb"
400
batch-size = 1000
401
refresh-interval = 1s
402
}
403
```
404
405
Usage:
406
407
```scala
408
val customJournal = PersistenceQuery(system)
409
.readJournalFor[CustomReadJournal]("my-custom-journal")
410
```
411
412
### Java API Implementation
413
414
Java wrapper for custom read journal:
415
416
```java
417
public class CustomJavaReadJournal implements javadsl.ReadJournal,
418
javadsl.EventsByPersistenceIdQuery,
419
javadsl.EventsByTagQuery {
420
421
private final CustomReadJournal scaladslJournal;
422
423
public CustomJavaReadJournal(CustomReadJournal scaladslJournal) {
424
this.scaladslJournal = scaladslJournal;
425
}
426
427
@Override
428
public Source<EventEnvelope, NotUsed> eventsByPersistenceId(
429
String persistenceId,
430
long fromSequenceNr,
431
long toSequenceNr) {
432
return scaladslJournal
433
.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
434
.asJava();
435
}
436
437
@Override
438
public Source<EventEnvelope, NotUsed> eventsByTag(String tag, Offset offset) {
439
return scaladslJournal
440
.eventsByTag(tag, offset)
441
.asJava();
442
}
443
}
444
```
445
446
## Performance Considerations
447
448
### Batch Processing
449
450
Optimize query performance with batching:
451
452
```scala
453
// Batch event loading
454
def loadEventsBatch(persistenceIds: List[String]): Future[List[EventEnvelope]] = {
455
Source(persistenceIds)
456
.mapAsync(parallelism = 4) { persistenceId =>
457
readJournal
458
.currentEventsByPersistenceId(persistenceId, 0L, Long.MaxValue)
459
.take(100) // Limit per persistence ID
460
.runWith(Sink.seq)
461
}
462
.runWith(Sink.seq)
463
.map(_.flatten.toList)
464
}
465
466
// Batch tag queries
467
def loadTaggedEventsBatch(tags: List[String]): Future[List[EventEnvelope]] = {
468
Source(tags)
469
.mapAsync(parallelism = 2) { tag =>
470
readJournal
471
.currentEventsByTag(tag, NoOffset)
472
.take(1000) // Limit per tag
473
.runWith(Sink.seq)
474
}
475
.runWith(Sink.seq)
476
.map(_.flatten.toList)
477
}
478
```
479
480
### Slice Distribution
481
482
Optimize slice-based processing:
483
484
```scala
485
def optimizedSliceProcessing(totalSlices: Int = 1024, processors: Int = 8): Unit = {
486
val slicesPerProcessor = totalSlices / processors
487
488
(0 until processors).foreach { processorId =>
489
val minSlice = processorId * slicesPerProcessor
490
val maxSlice = if (processorId == processors - 1) {
491
totalSlices - 1 // Last processor handles remaining slices
492
} else {
493
(processorId + 1) * slicesPerProcessor - 1
494
}
495
496
println(s"Processor $processorId: slices $minSlice to $maxSlice")
497
498
firehoseJournal
499
.eventsBySlices[DomainEvent]("Entity", minSlice, maxSlice, offset)
500
.buffer(size = 1000, OverflowStrategy.backpressure)
501
.runForeach { envelope =>
502
processEventOptimized(processorId, envelope)
503
}
504
}
505
}
506
```
507
508
## Error Handling
509
510
### Journal Implementation Errors
511
512
Handle errors in custom journal implementations:
513
514
```scala
515
class ResilientCustomReadJournal extends CustomReadJournal {
516
517
override def eventsByPersistenceId(
518
persistenceId: String,
519
fromSequenceNr: Long,
520
toSequenceNr: Long
521
): Source[EventEnvelope, NotUsed] = {
522
523
super.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
524
.recover {
525
case _: TimeoutException =>
526
println(s"Timeout loading events for $persistenceId, retrying...")
527
EventEnvelope.empty // Placeholder
528
case ex: Exception =>
529
println(s"Error loading events for $persistenceId: $ex")
530
throw ex
531
}
532
.filter(_ != EventEnvelope.empty) // Filter out placeholders
533
}
534
}
535
```
536
537
### Plugin Loading Failures
538
539
Handle plugin loading and configuration errors:
540
541
```scala
542
def safeLoadReadJournal[T <: ReadJournal](pluginId: String): Option[T] = {
543
try {
544
Some(PersistenceQuery(system).readJournalFor[T](pluginId))
545
} catch {
546
case _: ClassNotFoundException =>
547
println(s"Plugin class not found for $pluginId")
548
None
549
case _: ConfigurationException =>
550
println(s"Configuration error for plugin $pluginId")
551
None
552
case ex: Exception =>
553
println(s"Failed to load plugin $pluginId: $ex")
554
None
555
}
556
}
557
558
// Usage with fallback
559
val readJournal = safeLoadReadJournal[LeveldbReadJournal](LeveldbReadJournal.Identifier)
560
.getOrElse {
561
println("Falling back to in-memory journal")
562
getInMemoryReadJournal()
563
}
564
```
565
566
## Java API
567
568
Java API for journal implementations:
569
570
```java
571
import akka.persistence.query.PersistenceQuery;
572
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
573
import akka.persistence.query.typed.javadsl.EventsBySliceFirehoseQuery;
574
575
// LevelDB Java API (deprecated)
576
LeveldbReadJournal leveldbJournal = PersistenceQuery.get(system)
577
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier);
578
579
// Firehose Java API
580
EventsBySliceFirehoseQuery firehoseJournal = PersistenceQuery.get(system)
581
.getReadJournalFor(EventsBySliceFirehoseQuery.class,
582
EventsBySliceFirehoseQuery.Identifier);
583
584
// Query events by slice
585
firehoseJournal
586
.eventsBySlices(DomainEvent.class, "User", 0, 127, offset)
587
.runForeach(envelope -> {
588
System.out.println("Event: " + envelope.getEvent());
589
System.out.println("Slice: " + envelope.slice());
590
}, system);
591
592
// Load specific event
593
firehoseJournal
594
.loadEnvelope(UserEvent.class, "user-123", 42L)
595
.thenAccept(envelope -> {
596
System.out.println("Loaded: " + envelope.getEvent());
597
});
598
```
599
600
## Types
601
602
```scala { .api }
603
import akka.stream.OverflowStrategy
604
import java.time.Instant
605
import scala.concurrent.Future
606
import akka.stream.scaladsl.{Source, Sink}
607
import akka.NotUsed
608
import com.typesafe.config.Config
609
import akka.actor.ExtendedActorSystem
610
611
case class DomainEvent(eventType: String, data: Map[String, Any])
612
case class UserEvent(userId: String, eventType: String, data: Map[String, Any])
613
case class UserSnapshot(userId: String, state: Map[String, Any])
614
case class UserSnapshotRestored(userId: String, state: Map[String, Any])
615
616
// Custom backend interface
617
trait CustomJournalBackend {
618
def loadEvent(persistenceId: String, sequenceNr: Long): Future[Option[Any]]
619
def loadEventsByTag(tag: String, offset: Long, batchSize: Int): Future[List[EventEnvelope]]
620
def loadPersistenceIds(afterId: Option[String], limit: Long): Future[List[String]]
621
}
622
```