0
# Typed Query API
1
2
The typed query API provides enhanced type-safe query interfaces with improved event envelopes and slice-based querying for horizontal scaling. All typed APIs are marked as @ApiMayChange and provide both Scala and Java variants.
3
4
## Capabilities
5
6
### Events by Slice Query
7
8
Query events by entity type and slice range for horizontal scaling and distributed processing.
9
10
```scala { .api }
11
/**
12
* A plugin may optionally support this query by implementing this trait.
13
* API May Change
14
*/
15
trait EventsBySliceQuery extends ReadJournal {
16
/**
17
* Query events for given entity type and slice range. Useful for distributing the load
18
* and implementing resilient query projections.
19
*
20
* @param entityType The entity type to query events for
21
* @param minSlice The minimum slice number (inclusive)
22
* @param maxSlice The maximum slice number (inclusive)
23
* @param offset The offset to start from
24
* @return Source of typed event envelopes
25
*/
26
def eventsBySlices[Event](
27
entityType: String,
28
minSlice: Int,
29
maxSlice: Int,
30
offset: Offset
31
): Source[EventEnvelope[Event], NotUsed]
32
33
/**
34
* Get the slice number for a given persistence ID.
35
* Useful for determining which slice a persistence ID belongs to.
36
*/
37
def sliceForPersistenceId(persistenceId: String): Int
38
39
/**
40
* Get slice ranges for distributing the load across multiple query processors.
41
*
42
* @param numberOfRanges Number of ranges to create
43
* @return Sequence of slice ranges
44
*/
45
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
46
}
47
```
48
49
**Usage Examples:**
50
51
```scala
52
import akka.persistence.query.typed.scaladsl.EventsBySliceQuery
53
import akka.persistence.query.typed.EventEnvelope
54
55
val readJournal: EventsBySliceQuery = getTypedReadJournal()
56
57
// Query events from specific slice range
58
readJournal
59
.eventsBySlices[UserEvent]("User", 0, 127, offset)
60
.runForeach { envelope =>
61
println(s"Event: ${envelope.event} from slice ${envelope.slice}")
62
println(s"Entity type: ${envelope.entityType}")
63
println(s"Tags: ${envelope.tags}")
64
}
65
66
// Distribute processing across multiple slices
67
val sliceRanges = readJournal.sliceRanges(4) // Create 4 ranges
68
sliceRanges.zipWithIndex.foreach { case (range, index) =>
69
println(s"Processor $index handles slices ${range.start} to ${range.end}")
70
71
readJournal
72
.eventsBySlices[UserEvent]("User", range.start, range.end, offset)
73
.runForeach { envelope =>
74
processEventInProcessor(index, envelope)
75
}
76
}
77
78
// Check which slice a persistence ID belongs to
79
val slice = readJournal.sliceForPersistenceId("user-12345")
80
println(s"user-12345 belongs to slice $slice")
81
```
82
83
### Current Events by Slice Query
84
85
Query current (finite) events by entity type and slice range.
86
87
```scala { .api }
88
/**
89
* A plugin may optionally support this query by implementing this trait.
90
* API May Change
91
*/
92
trait CurrentEventsBySliceQuery extends ReadJournal {
93
/**
94
* Same as EventsBySliceQuery#eventsBySlices but the event stream
95
* is completed immediately when it reaches the end of currently stored events.
96
*/
97
def currentEventsBySlices[Event](
98
entityType: String,
99
minSlice: Int,
100
maxSlice: Int,
101
offset: Offset
102
): Source[EventEnvelope[Event], NotUsed]
103
104
def sliceForPersistenceId(persistenceId: String): Int
105
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
106
}
107
```
108
109
**Usage Examples:**
110
111
```scala
112
import akka.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery
113
114
val readJournal: CurrentEventsBySliceQuery = getTypedReadJournal()
115
116
// Process all current events from slice range (finite stream)
117
readJournal
118
.currentEventsBySlices[OrderEvent]("Order", 512, 1023, offset)
119
.runForeach { envelope =>
120
processHistoricalEvent(envelope.event)
121
}
122
.onComplete {
123
case Success(_) => println("Finished processing historical events")
124
case Failure(ex) => println(s"Processing failed: $ex")
125
}
126
```
127
128
### Events by Persistence ID Typed Query
129
130
Query events for a specific persistence ID with typed envelopes.
131
132
```scala { .api }
133
/**
134
* A plugin may optionally support this query by implementing this trait.
135
* API May Change
136
*/
137
trait EventsByPersistenceIdTypedQuery extends ReadJournal {
138
/**
139
* Query events for a specific persistence ID with enhanced typed envelopes.
140
*/
141
def eventsByPersistenceIdTyped[Event](
142
persistenceId: String,
143
fromSequenceNr: Long,
144
toSequenceNr: Long
145
): Source[EventEnvelope[Event], NotUsed]
146
}
147
```
148
149
**Usage Examples:**
150
151
```scala
152
import akka.persistence.query.typed.scaladsl.EventsByPersistenceIdTypedQuery
153
154
val readJournal: EventsByPersistenceIdTypedQuery = getTypedReadJournal()
155
156
// Query typed events for persistence ID
157
readJournal
158
.eventsByPersistenceIdTyped[UserEvent]("user-123", 0L, Long.MaxValue)
159
.runForeach { envelope =>
160
println(s"Typed event: ${envelope.event}")
161
println(s"Entity type: ${envelope.entityType}")
162
println(s"Event tags: ${envelope.tags}")
163
164
// Type-safe event processing
165
envelope.event match {
166
case UserCreated(name, email) => println(s"User $name created")
167
case UserUpdated(name, email) => println(s"User $name updated")
168
case UserDeleted(userId) => println(s"User $userId deleted")
169
}
170
}
171
```
172
173
### Current Events by Persistence ID Typed Query
174
175
Query current events for a specific persistence ID with typed envelopes.
176
177
```scala { .api }
178
/**
179
* A plugin may optionally support this query by implementing this trait.
180
* API May Change
181
*/
182
trait CurrentEventsByPersistenceIdTypedQuery extends ReadJournal {
183
/**
184
* Same as EventsByPersistenceIdTypedQuery but completed when reaching
185
* the end of currently stored events.
186
*/
187
def currentEventsByPersistenceIdTyped[Event](
188
persistenceId: String,
189
fromSequenceNr: Long,
190
toSequenceNr: Long
191
): Source[EventEnvelope[Event], NotUsed]
192
}
193
```
194
195
### Events by Persistence ID Starting from Snapshot
196
197
Query events starting from a snapshot with transformation.
198
199
```scala { .api }
200
/**
201
* A plugin may optionally support this query by implementing this trait.
202
* API May Change
203
*/
204
trait EventsByPersistenceIdStartingFromSnapshotQuery extends ReadJournal {
205
/**
206
* Query events starting from a snapshot. The snapshot is loaded and transformed
207
* to an event, then followed by events from the sequence number after the snapshot.
208
*/
209
def eventsByPersistenceIdStartingFromSnapshot[Snapshot, Event](
210
persistenceId: String,
211
fromSequenceNr: Long,
212
toSequenceNr: Long,
213
transformSnapshot: Snapshot => Event
214
): Source[EventEnvelope[Event], NotUsed]
215
}
216
```
217
218
**Usage Examples:**
219
220
```scala
221
import akka.persistence.query.typed.scaladsl.EventsByPersistenceIdStartingFromSnapshotQuery
222
223
val readJournal: EventsByPersistenceIdStartingFromSnapshotQuery = getTypedReadJournal()
224
225
// Transform snapshot to event and include in stream
226
readJournal
227
.eventsByPersistenceIdStartingFromSnapshot[UserSnapshot, UserEvent](
228
persistenceId = "user-123",
229
fromSequenceNr = 0L,
230
toSequenceNr = Long.MaxValue,
231
transformSnapshot = snapshot => UserStateRestored(snapshot.name, snapshot.email, snapshot.preferences)
232
)
233
.runForeach { envelope =>
234
envelope.event match {
235
case UserStateRestored(name, email, prefs) =>
236
println(s"Restored user state: $name")
237
case otherEvent =>
238
println(s"Regular event: $otherEvent")
239
}
240
}
241
```
242
243
### Events by Slice Starting from Snapshots
244
245
Query events by slice starting from snapshots with transformation.
246
247
```scala { .api }
248
/**
249
* A plugin may optionally support this query by implementing this trait.
250
* API May Change
251
*/
252
trait EventsBySliceStartingFromSnapshotsQuery extends ReadJournal {
253
/**
254
* Query events by slice starting from snapshots. Snapshots are loaded and transformed
255
* to events, then followed by events from the sequence number after the snapshot.
256
*/
257
def eventsBySlicesStartingFromSnapshots[Snapshot, Event](
258
entityType: String,
259
minSlice: Int,
260
maxSlice: Int,
261
offset: Offset,
262
transformSnapshot: Snapshot => Event
263
): Source[EventEnvelope[Event], NotUsed]
264
265
def sliceForPersistenceId(persistenceId: String): Int
266
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
267
}
268
```
269
270
**Usage Examples:**
271
272
```scala
273
import akka.persistence.query.typed.scaladsl.EventsBySliceStartingFromSnapshotsQuery
274
275
val readJournal: EventsBySliceStartingFromSnapshotsQuery = getTypedReadJournal()
276
277
// Query slice events starting from snapshots
278
readJournal
279
.eventsBySlicesStartingFromSnapshots[OrderSnapshot, OrderEvent](
280
entityType = "Order",
281
minSlice = 0,
282
maxSlice = 255,
283
offset = offset,
284
transformSnapshot = snapshot => OrderStateRestored(
285
orderId = snapshot.orderId,
286
items = snapshot.items,
287
total = snapshot.total,
288
status = snapshot.status
289
)
290
)
291
.runForeach { envelope =>
292
envelope.event match {
293
case OrderStateRestored(orderId, items, total, status) =>
294
println(s"Restored order $orderId with $total")
295
case regularEvent =>
296
processOrderEvent(regularEvent)
297
}
298
}
299
```
300
301
### Load Event Query
302
303
Load individual events on demand.
304
305
```scala { .api }
306
/**
307
* A plugin may optionally support this query by implementing this trait.
308
* API May Change
309
*/
310
trait LoadEventQuery extends ReadJournal {
311
/**
312
* Load a specific event envelope by persistence ID and sequence number.
313
* Useful for loading events that were not included in the original query result.
314
*/
315
def loadEnvelope[Event](
316
persistenceId: String,
317
sequenceNr: Long
318
): Future[EventEnvelope[Event]]
319
}
320
```
321
322
**Usage Examples:**
323
324
```scala
325
import akka.persistence.query.typed.scaladsl.LoadEventQuery
326
327
val readJournal: LoadEventQuery = getTypedReadJournal()
328
329
// Load specific event on demand
330
def processEnvelopeWithLoading[Event](envelope: EventEnvelope[Event]): Future[Unit] = {
331
envelope.eventOption match {
332
case Some(event) =>
333
Future.successful(processEvent(event))
334
case None if !envelope.filtered =>
335
// Event not loaded, load it on demand
336
readJournal
337
.loadEnvelope[Event](envelope.persistenceId, envelope.sequenceNr)
338
.map(loadedEnvelope => processEvent(loadedEnvelope.event))
339
case None =>
340
// Event was filtered, skip processing
341
Future.successful(())
342
}
343
}
344
345
// Use with slice queries
346
readJournal
347
.asInstanceOf[EventsBySliceQuery]
348
.eventsBySlices[UserEvent]("User", 0, 127, offset)
349
.mapAsync(4)(processEnvelopeWithLoading)
350
.runWith(Sink.ignore)
351
```
352
353
### Event Timestamp Query
354
355
Query timestamps for specific events.
356
357
```scala { .api }
358
/**
359
* A plugin may optionally support this query by implementing this trait.
360
* API May Change
361
*/
362
trait EventTimestampQuery extends ReadJournal {
363
/**
364
* Get the timestamp of a specific event by persistence ID and sequence number.
365
* Returns None if the event doesn't exist.
366
*/
367
def timestampOf(
368
persistenceId: String,
369
sequenceNr: Long
370
): Future[Option[Instant]]
371
}
372
```
373
374
**Usage Examples:**
375
376
```scala
377
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
378
import java.time.Instant
379
380
val readJournal: EventTimestampQuery = getTypedReadJournal()
381
382
// Get timestamp of specific event
383
readJournal
384
.timestampOf("user-123", 42L)
385
.foreach {
386
case Some(timestamp) =>
387
println(s"Event 42 was stored at: $timestamp")
388
case None =>
389
println("Event 42 not found")
390
}
391
392
// Check event age
393
def checkEventAge(persistenceId: String, sequenceNr: Long): Future[Unit] = {
394
readJournal
395
.timestampOf(persistenceId, sequenceNr)
396
.map {
397
case Some(timestamp) =>
398
val age = java.time.Duration.between(timestamp, Instant.now())
399
if (age.toDays > 30) {
400
println(s"Event is ${age.toDays} days old")
401
}
402
case None =>
403
println("Event not found")
404
}
405
}
406
```
407
408
## Slice-Based Processing Patterns
409
410
### Distributed Event Processing
411
412
Use slices to distribute event processing across multiple nodes:
413
414
```scala
415
def startSliceProcessor(processorId: Int, totalProcessors: Int): Unit = {
416
val readJournal: EventsBySliceQuery = getTypedReadJournal()
417
418
// Calculate slice range for this processor
419
val sliceRanges = readJournal.sliceRanges(totalProcessors)
420
val myRange = sliceRanges(processorId)
421
422
println(s"Processor $processorId handling slices ${myRange.start} to ${myRange.end}")
423
424
readJournal
425
.eventsBySlices[DomainEvent]("Entity", myRange.start, myRange.end, offset)
426
.runForeach { envelope =>
427
// Process events for this slice range
428
processEvent(envelope.event)
429
430
// Update offset for this processor
431
saveProcessorOffset(processorId, envelope.offset)
432
}
433
}
434
435
// Start 8 processors
436
(0 until 8).foreach(startSliceProcessor(_, 8))
437
```
438
439
### Entity Type Filtering
440
441
Process different entity types with separate processors:
442
443
```scala
444
val entityTypes = List("User", "Order", "Payment", "Inventory")
445
446
entityTypes.foreach { entityType =>
447
readJournal
448
.eventsBySlices[DomainEvent](entityType, 0, 1023, offset)
449
.runForeach { envelope =>
450
entityType match {
451
case "User" => processUserEvent(envelope.event)
452
case "Order" => processOrderEvent(envelope.event)
453
case "Payment" => processPaymentEvent(envelope.event)
454
case "Inventory" => processInventoryEvent(envelope.event)
455
}
456
}
457
}
458
```
459
460
### Snapshot Integration
461
462
Combine snapshots with event streams for efficient state reconstruction:
463
464
```scala
465
def buildProjectionFromSnapshots[State, Event](
466
entityType: String,
467
slice: Int,
468
initialState: State,
469
applyEvent: (State, Event) => State,
470
applySnapshot: UserSnapshot => State
471
): Future[State] = {
472
473
readJournal
474
.eventsBySlicesStartingFromSnapshots[UserSnapshot, Event](
475
entityType = entityType,
476
minSlice = slice,
477
maxSlice = slice,
478
offset = TimestampOffset.Zero,
479
transformSnapshot = snapshot => SnapshotRestored(snapshot).asInstanceOf[Event]
480
)
481
.runFold(initialState) { (state, envelope) =>
482
envelope.event match {
483
case SnapshotRestored(snapshot) =>
484
applySnapshot(snapshot.asInstanceOf[UserSnapshot])
485
case event =>
486
applyEvent(state, event)
487
}
488
}
489
}
490
```
491
492
## Java API
493
494
All typed Scala query traits have corresponding Java API equivalents:
495
496
```java
497
import akka.persistence.query.typed.javadsl.*;
498
import java.util.concurrent.CompletionStage;
499
import java.util.List;
500
import akka.japi.Pair;
501
502
// Java API usage
503
EventsBySliceQuery readJournal = getJavaTypedReadJournal();
504
505
// Query events by slice
506
readJournal
507
.eventsBySlices(UserEvent.class, "User", 0, 127, offset)
508
.runForeach(envelope -> {
509
System.out.println("Event: " + envelope.getEvent());
510
System.out.println("Entity type: " + envelope.entityType());
511
System.out.println("Slice: " + envelope.slice());
512
}, system);
513
514
// Get slice ranges for distribution
515
List<Pair<Integer, Integer>> ranges = readJournal.sliceRanges(4);
516
ranges.forEach(range -> {
517
System.out.println("Range: " + range.first() + " to " + range.second());
518
});
519
520
// Load event on demand
521
LoadEventQuery loadQuery = (LoadEventQuery) readJournal;
522
CompletionStage<EventEnvelope<UserEvent>> future =
523
loadQuery.loadEnvelope(UserEvent.class, "user-123", 42L);
524
```
525
526
## Error Handling
527
528
### Event Loading Failures
529
530
Handle failures when loading events on demand:
531
532
```scala
533
def safeLoadEvent[Event](
534
persistenceId: String,
535
sequenceNr: Long
536
): Future[Option[Event]] = {
537
readJournal
538
.asInstanceOf[LoadEventQuery]
539
.loadEnvelope[Event](persistenceId, sequenceNr)
540
.map(envelope => Some(envelope.event))
541
.recover {
542
case _: NoSuchElementException => None
543
case ex =>
544
println(s"Failed to load event: $ex")
545
None
546
}
547
}
548
```
549
550
### Slice Processing Failures
551
552
Handle failures in slice-based processing:
553
554
```scala
555
def resilientSliceProcessing(slice: Int): Unit = {
556
readJournal
557
.eventsBySlices[DomainEvent]("Entity", slice, slice, offset)
558
.recover {
559
case ex: Exception =>
560
println(s"Error in slice $slice: $ex")
561
// Return empty envelope or restart logic
562
EventEnvelope.empty
563
}
564
.runForeach { envelope =>
565
try {
566
processEvent(envelope.event)
567
} catch {
568
case ex: Exception =>
569
println(s"Failed to process event in slice $slice: $ex")
570
}
571
}
572
}
573
```
574
575
## Types
576
577
```scala { .api }
578
case class UserEvent(userId: String, eventType: String, data: Map[String, Any])
579
case class OrderEvent(orderId: String, eventType: String, data: Map[String, Any])
580
case class DomainEvent(entityId: String, eventType: String, payload: Any)
581
582
case class UserCreated(name: String, email: String)
583
case class UserUpdated(name: String, email: String)
584
case class UserDeleted(userId: String)
585
case class UserStateRestored(name: String, email: String, preferences: Map[String, String])
586
587
case class OrderStateRestored(orderId: String, items: List[String], total: BigDecimal, status: String)
588
case class SnapshotRestored[T](snapshot: T)
589
590
case class UserSnapshot(name: String, email: String, preferences: Map[String, String])
591
case class OrderSnapshot(orderId: String, items: List[String], total: BigDecimal, status: String)
592
593
import scala.concurrent.Future
594
import java.time.Instant
595
import akka.stream.scaladsl.{Source, Sink}
596
import akka.NotUsed
597
import scala.collection.immutable
598
```