0
# Untyped Query API
1
2
The untyped (original) query API provides standardized interfaces for querying persisted events and persistence IDs. These traits define optional capabilities that read journal implementations may support.
3
4
## Capabilities
5
6
### Base Read Journal
7
8
Base marker trait for all Scala read journal implementations.
9
10
```scala { .api }
11
/**
12
* API for reading persistent events and information derived from stored persistent events.
13
*
14
* The purpose of the API is not to enforce compatibility between different
15
* journal implementations, because the technical capabilities may be very different.
16
* The interface is very open so that different journals may implement specific queries.
17
*/
18
trait ReadJournal
19
```
20
21
### Events by Persistence ID
22
23
Query events for a specific persistent actor by its persistence ID.
24
25
```scala { .api }
26
/**
27
* A plugin may optionally support this query by implementing this trait.
28
*/
29
trait EventsByPersistenceIdQuery extends ReadJournal {
30
/**
31
* Query events for a specific PersistentActor identified by persistenceId.
32
*
33
* You can retrieve a subset of all events by specifying fromSequenceNr and toSequenceNr
34
* or use 0L and Long.MaxValue respectively to retrieve all events. The query will
35
* return all the events inclusive of the fromSequenceNr and toSequenceNr values.
36
*
37
* The returned event stream should be ordered by sequence number.
38
*
39
* The stream is not completed when it reaches the end of the currently stored events,
40
* but it continues to push new events when new events are persisted.
41
*/
42
def eventsByPersistenceId(
43
persistenceId: String,
44
fromSequenceNr: Long,
45
toSequenceNr: Long
46
): Source[EventEnvelope, NotUsed]
47
}
48
```
49
50
**Usage Examples:**
51
52
```scala
53
import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
54
import akka.stream.scaladsl.Sink
55
56
// Implement the query trait
57
class MyReadJournal extends ReadJournal with EventsByPersistenceIdQuery {
58
def eventsByPersistenceId(
59
persistenceId: String,
60
fromSequenceNr: Long,
61
toSequenceNr: Long
62
): Source[EventEnvelope, NotUsed] = {
63
// Implementation specific logic
64
Source.empty
65
}
66
}
67
68
// Use the query
69
val readJournal: EventsByPersistenceIdQuery = getMyReadJournal()
70
71
// Query all events for a persistence ID (live stream)
72
readJournal
73
.eventsByPersistenceId("user-123", 0L, Long.MaxValue)
74
.runWith(Sink.foreach { envelope =>
75
println(s"Event: ${envelope.event} at sequence ${envelope.sequenceNr}")
76
})
77
78
// Query specific sequence number range
79
readJournal
80
.eventsByPersistenceId("order-456", 10L, 50L)
81
.runForeach { envelope =>
82
processEvent(envelope.event)
83
}
84
```
85
86
### Current Events by Persistence ID
87
88
Query current (finite) events for a specific persistent actor.
89
90
```scala { .api }
91
/**
92
* A plugin may optionally support this query by implementing this trait.
93
*/
94
trait CurrentEventsByPersistenceIdQuery extends ReadJournal {
95
/**
96
* Same type of query as EventsByPersistenceIdQuery#eventsByPersistenceId but the event stream
97
* is completed immediately when it reaches the end of the currently stored events.
98
*/
99
def currentEventsByPersistenceId(
100
persistenceId: String,
101
fromSequenceNr: Long,
102
toSequenceNr: Long
103
): Source[EventEnvelope, NotUsed]
104
}
105
```
106
107
**Usage Examples:**
108
109
```scala
110
import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
111
112
// Query current events (finite stream)
113
val readJournal: CurrentEventsByPersistenceIdQuery = getMyReadJournal()
114
115
readJournal
116
.currentEventsByPersistenceId("user-123", 0L, Long.MaxValue)
117
.runForeach { envelope =>
118
println(s"Historical event: ${envelope.event}")
119
}
120
.onComplete {
121
case Success(_) => println("Finished processing historical events")
122
case Failure(ex) => println(s"Failed: $ex")
123
}
124
```
125
126
### Events by Tag
127
128
Query events that have a specific tag across all persistence IDs.
129
130
```scala { .api }
131
/**
132
* A plugin may optionally support this query by implementing this trait.
133
*/
134
trait EventsByTagQuery extends ReadJournal {
135
/**
136
* Query events that have a specific tag. A tag can for example correspond to an
137
* aggregate root type (in DDD terminology).
138
*
139
* The consumer can keep track of its current position in the event stream by storing the
140
* offset and restart the query from a given offset after a crash/restart.
141
*
142
* The returned event stream should be ordered by offset if possible, but this can also be
143
* difficult to fulfill for a distributed data store. The order must be documented by the
144
* read journal plugin.
145
*
146
* The stream is not completed when it reaches the end of the currently stored events,
147
* but it continues to push new events when new events are persisted.
148
*/
149
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
150
}
151
```
152
153
**Usage Examples:**
154
155
```scala
156
import akka.persistence.query.scaladsl.EventsByTagQuery
157
import akka.persistence.query.{NoOffset, Sequence}
158
159
val readJournal: EventsByTagQuery = getMyReadJournal()
160
161
// Query events by tag from beginning (live stream)
162
readJournal
163
.eventsByTag("user-events", NoOffset)
164
.runForeach { envelope =>
165
println(s"Tagged event: ${envelope.event} from ${envelope.persistenceId}")
166
}
167
168
// Resume from specific offset
169
val lastProcessedOffset = Sequence(1000L)
170
readJournal
171
.eventsByTag("order-events", lastProcessedOffset)
172
.runForeach { envelope =>
173
processTaggedEvent(envelope)
174
saveOffset(envelope.offset) // Save for resumption
175
}
176
```
177
178
### Current Events by Tag
179
180
Query current (finite) events that have a specific tag.
181
182
```scala { .api }
183
/**
184
* A plugin may optionally support this query by implementing this trait.
185
*/
186
trait CurrentEventsByTagQuery extends ReadJournal {
187
/**
188
* Same type of query as EventsByTagQuery#eventsByTag but the event stream
189
* is completed immediately when it reaches the end of the currently stored events.
190
*/
191
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
192
}
193
```
194
195
**Usage Examples:**
196
197
```scala
198
import akka.persistence.query.scaladsl.CurrentEventsByTagQuery
199
200
val readJournal: CurrentEventsByTagQuery = getMyReadJournal()
201
202
// Process all current events with tag (finite stream)
203
readJournal
204
.currentEventsByTag("batch-process", NoOffset)
205
.runForeach { envelope =>
206
processBatchEvent(envelope.event)
207
}
208
.onComplete {
209
case Success(_) => println("Batch processing complete")
210
case Failure(ex) => println(s"Batch processing failed: $ex")
211
}
212
```
213
214
### Persistence IDs Query
215
216
Query all persistence IDs in the journal.
217
218
```scala { .api }
219
/**
220
* A plugin may optionally support this query by implementing this trait.
221
*/
222
trait PersistenceIdsQuery extends ReadJournal {
223
/**
224
* Query all PersistentActor identifiers, i.e. as defined by the
225
* persistenceId of the PersistentActor.
226
*
227
* The stream is not completed when it reaches the end of the currently used persistenceIds,
228
* but it continues to push new persistenceIds when new persistent actors are created.
229
*/
230
def persistenceIds(): Source[String, NotUsed]
231
}
232
```
233
234
**Usage Examples:**
235
236
```scala
237
import akka.persistence.query.scaladsl.PersistenceIdsQuery
238
239
val readJournal: PersistenceIdsQuery = getMyReadJournal()
240
241
// Get all persistence IDs (live stream)
242
readJournal
243
.persistenceIds()
244
.runForeach { persistenceId =>
245
println(s"Found persistence ID: $persistenceId")
246
247
// Query events for each persistence ID
248
readJournal
249
.asInstanceOf[EventsByPersistenceIdQuery]
250
.eventsByPersistenceId(persistenceId, 0L, Long.MaxValue)
251
.take(10) // Limit to first 10 events
252
.runForeach { envelope =>
253
println(s" Event: ${envelope.event}")
254
}
255
}
256
```
257
258
### Current Persistence IDs Query
259
260
Query all current (finite) persistence IDs in the journal.
261
262
```scala { .api }
263
/**
264
* A plugin may optionally support this query by implementing this trait.
265
*/
266
trait CurrentPersistenceIdsQuery extends ReadJournal {
267
/**
268
* Same type of query as PersistenceIdsQuery#persistenceIds but the stream
269
* is completed immediately when it reaches the end of the currently used persistenceIds.
270
*/
271
def currentPersistenceIds(): Source[String, NotUsed]
272
}
273
```
274
275
**Usage Examples:**
276
277
```scala
278
import akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery
279
280
val readJournal: CurrentPersistenceIdsQuery = getMyReadJournal()
281
282
// Get all current persistence IDs (finite stream)
283
readJournal
284
.currentPersistenceIds()
285
.runForeach { persistenceId =>
286
println(s"Current persistence ID: $persistenceId")
287
}
288
.onComplete {
289
case Success(_) => println("Finished scanning persistence IDs")
290
case Failure(ex) => println(s"Scan failed: $ex")
291
}
292
```
293
294
### Paged Persistence IDs Query
295
296
Query persistence IDs with pagination support for large datasets.
297
298
```scala { .api }
299
/**
300
* A plugin ReadJournal may optionally support this query by implementing this trait.
301
*/
302
trait PagedPersistenceIdsQuery extends ReadJournal {
303
/**
304
* Get the current persistence ids.
305
*
306
* Not all plugins may support in database paging, and may simply use drop/take Akka streams operators
307
* to manipulate the result set according to the paging parameters.
308
*
309
* @param afterId The ID to start returning results from, or None to return all ids. This should be an id
310
* returned from a previous invocation of this command. Callers should not assume that ids are
311
* returned in sorted order.
312
* @param limit The maximum results to return. Use Long.MaxValue to return all results. Must be greater than zero.
313
* @return A source containing all the persistence ids, limited as specified.
314
*/
315
def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]
316
}
317
```
318
319
**Usage Examples:**
320
321
```scala
322
import akka.persistence.query.scaladsl.PagedPersistenceIdsQuery
323
324
val readJournal: PagedPersistenceIdsQuery = getMyReadJournal()
325
326
// Get first page of persistence IDs
327
readJournal
328
.currentPersistenceIds(None, 100)
329
.runFold(List.empty[String])(_ :+ _)
330
.map { firstPage =>
331
println(s"First page: ${firstPage.mkString(", ")}")
332
333
// Get next page starting from last ID
334
if (firstPage.nonEmpty) {
335
val lastId = firstPage.last
336
readJournal
337
.currentPersistenceIds(Some(lastId), 100)
338
.runForeach { nextId =>
339
println(s"Next page ID: $nextId")
340
}
341
}
342
}
343
344
// Get all persistence IDs with unlimited results
345
readJournal
346
.currentPersistenceIds(None, Long.MaxValue)
347
.runForeach { persistenceId =>
348
println(s"Persistence ID: $persistenceId")
349
}
350
```
351
352
## Types
353
354
```scala { .api }
355
import akka.NotUsed
356
import akka.stream.scaladsl.Source
357
358
trait Product4[T1, T2, T3, T4] {
359
def _1: T1
360
def _2: T2
361
def _3: T3
362
def _4: T4
363
}
364
365
case object NotUsed
366
```
367
368
## Query Composition
369
370
### Multiple Query Traits
371
372
Combine multiple query capabilities in a single read journal:
373
374
```scala
375
class ComprehensiveReadJournal extends ReadJournal
376
with EventsByPersistenceIdQuery
377
with CurrentEventsByPersistenceIdQuery
378
with EventsByTagQuery
379
with CurrentEventsByTagQuery
380
with PersistenceIdsQuery
381
with CurrentPersistenceIdsQuery {
382
383
// Implement all query methods
384
def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long) = ???
385
def currentEventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long) = ???
386
def eventsByTag(tag: String, offset: Offset) = ???
387
def currentEventsByTag(tag: String, offset: Offset) = ???
388
def persistenceIds() = ???
389
def currentPersistenceIds() = ???
390
}
391
```
392
393
### Capability Detection
394
395
Check if a read journal supports specific query capabilities:
396
397
```scala
398
def checkCapabilities(readJournal: ReadJournal): Unit = {
399
readJournal match {
400
case _: EventsByPersistenceIdQuery =>
401
println("Supports events by persistence ID queries")
402
case _ =>
403
println("Does not support events by persistence ID queries")
404
}
405
406
readJournal match {
407
case _: EventsByTagQuery =>
408
println("Supports events by tag queries")
409
case _ =>
410
println("Does not support events by tag queries")
411
}
412
413
readJournal match {
414
case _: PersistenceIdsQuery =>
415
println("Supports persistence IDs queries")
416
case _ =>
417
println("Does not support persistence IDs queries")
418
}
419
}
420
```
421
422
## Error Handling
423
424
### Query Failures
425
426
Handle failures in query streams:
427
428
```scala
429
readJournal
430
.eventsByTag("user-events", offset)
431
.recover {
432
case _: TimeoutException =>
433
println("Query timed out, restarting...")
434
EventEnvelope.empty // Placeholder
435
}
436
.runForeach { envelope =>
437
try {
438
processEvent(envelope.event)
439
} catch {
440
case ex: Exception =>
441
println(s"Failed to process event: $ex")
442
}
443
}
444
```
445
446
### Offset Validation
447
448
Validate offsets before using in queries:
449
450
```scala
451
def validateOffset(offset: Offset): Boolean = {
452
offset match {
453
case NoOffset => true
454
case Sequence(value) => value >= 0
455
case TimestampOffset(timestamp, _, _) => !timestamp.isBefore(Instant.EPOCH)
456
case _ => false
457
}
458
}
459
460
def safeEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
461
if (validateOffset(offset)) {
462
readJournal.eventsByTag(tag, offset)
463
} else {
464
Source.failed(new IllegalArgumentException(s"Invalid offset: $offset"))
465
}
466
}
467
```
468
469
## Java API
470
471
All Scala query traits have corresponding Java API equivalents in the `javadsl` package:
472
473
```java
474
// Java API usage
475
import akka.persistence.query.PersistenceQuery;
476
import akka.persistence.query.javadsl.*;
477
478
// Get Java read journal
479
ReadJournal readJournal = PersistenceQuery.get(system)
480
.getReadJournalFor(MyJavaReadJournal.class, "my-journal");
481
482
// Check capabilities and use
483
if (readJournal instanceof EventsByPersistenceIdQuery) {
484
EventsByPersistenceIdQuery query = (EventsByPersistenceIdQuery) readJournal;
485
query.eventsByPersistenceId("user-123", 0L, Long.MAX_VALUE)
486
.runForeach(envelope -> {
487
System.out.println("Event: " + envelope.event());
488
}, system);
489
}
490
```
491
492
## Types
493
494
```scala { .api }
495
import akka.stream.scaladsl.Source
496
import akka.NotUsed
497
import scala.concurrent.Future
498
import scala.util.{Success, Failure}
499
500
trait TimeoutException extends Exception
501
```