0
# Durable State Queries
1
2
Durable state queries provide interfaces for querying state changes and persistence IDs from durable state stores. These complement event-based queries by focusing on state changes rather than individual events.
3
4
## Capabilities
5
6
### Durable State Change Types
7
8
Base trait and implementations for representing state changes.
9
10
```scala { .api }
11
/**
12
* The DurableStateStoreQuery stream elements for DurableStateStoreQuery.
13
* The implementation can be UpdatedDurableState or DeletedDurableState.
14
* Not for user extension.
15
*/
16
sealed trait DurableStateChange[A] {
17
/** The persistence id of the origin entity */
18
def persistenceId: String
19
20
/** The offset that can be used in next changes or currentChanges query */
21
def offset: Offset
22
}
23
```
24
25
### Updated Durable State
26
27
Represents an updated state change with the new value.
28
29
```scala { .api }
30
/**
31
* Updated durable state change containing the new state value.
32
*
33
* @param persistenceId The persistence id of the origin entity
34
* @param revision The revision number from the origin entity
35
* @param value The new state value
36
* @param offset The offset that can be used in next changes or currentChanges query
37
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
38
*/
39
final class UpdatedDurableState[A](
40
val persistenceId: String,
41
val revision: Long,
42
val value: A,
43
override val offset: Offset,
44
val timestamp: Long
45
) extends DurableStateChange[A]
46
```
47
48
**Usage Examples:**
49
50
```scala
51
import akka.persistence.query.{UpdatedDurableState, Sequence}
52
53
// Pattern matching on state changes
54
readJournal
55
.changes("user-states", offset)
56
.runForeach {
57
case updated: UpdatedDurableState[UserState] =>
58
println(s"User ${updated.persistenceId} updated to revision ${updated.revision}")
59
println(s"New state: ${updated.value}")
60
println(s"Updated at: ${updated.timestamp}")
61
62
// Process the updated state
63
processUserStateUpdate(updated.value)
64
65
case deleted: DeletedDurableState[UserState] =>
66
println(s"User ${deleted.persistenceId} deleted at revision ${deleted.revision}")
67
processUserStateDeletion(deleted.persistenceId)
68
}
69
70
// Extract fields using unapply
71
readJournal
72
.changes("user-states", offset)
73
.runForeach {
74
case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) =>
75
println(s"Updated: $persistenceId at revision $revision")
76
updateProjection(persistenceId, value.asInstanceOf[UserState])
77
}
78
```
79
80
### Deleted Durable State
81
82
Represents a deleted state change.
83
84
```scala { .api }
85
/**
86
* Deleted durable state change indicating the state was removed.
87
*
88
* @param persistenceId The persistence id of the origin entity
89
* @param revision The revision number from the origin entity
90
* @param offset The offset that can be used in next changes or currentChanges query
91
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
92
*/
93
final class DeletedDurableState[A](
94
val persistenceId: String,
95
val revision: Long,
96
override val offset: Offset,
97
val timestamp: Long
98
) extends DurableStateChange[A]
99
```
100
101
**Usage Examples:**
102
103
```scala
104
import akka.persistence.query.DeletedDurableState
105
106
// Handle deleted state
107
readJournal
108
.changes("user-states", offset)
109
.runForeach {
110
case deleted: DeletedDurableState[UserState] =>
111
println(s"User ${deleted.persistenceId} deleted at revision ${deleted.revision}")
112
113
// Clean up related data
114
cleanupUserData(deleted.persistenceId)
115
116
// Update read models
117
removeFromReadModel(deleted.persistenceId)
118
119
case updated: UpdatedDurableState[UserState] =>
120
// Handle updates
121
processUserStateUpdate(updated.value)
122
}
123
124
// Pattern matching with extraction
125
readJournal
126
.changes("user-states", offset)
127
.runForeach {
128
case DeletedDurableState(persistenceId, revision, offset, timestamp) =>
129
println(s"Deleted: $persistenceId at revision $revision, timestamp $timestamp")
130
handleDeletion(persistenceId)
131
}
132
```
133
134
### Durable State Store Query
135
136
Main query interface for durable state changes.
137
138
```scala { .api }
139
/**
140
* Query API for reading durable state objects.
141
*/
142
trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
143
/**
144
* Get a source of the most recent changes made to objects with the given tag since the passed in offset.
145
*
146
* Note that this only returns the most recent change to each object, if an object has been updated multiple times
147
* since the offset, only the most recent of those changes will be part of the stream.
148
*
149
* This will return changes that occurred up to when the Source returned by this call is materialized. Changes to
150
* objects made since materialization are not guaranteed to be included in the results.
151
*
152
* @param tag The tag to get changes for
153
* @param offset The offset to get changes since. Must either be NoOffset to get changes since the beginning of time,
154
* or an offset that has been previously returned by this query
155
* @return A source of state changes
156
*/
157
def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
158
159
/**
160
* Get a source of the most recent changes made to objects of the given tag since the passed in offset.
161
*
162
* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
163
* they happen.
164
*
165
* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most
166
* recent change for each object since the offset will be emitted. In particular, multiple updates to a given object
167
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
168
* source.
169
*
170
* @param tag The tag to get changes for
171
* @param offset The offset to get changes since. Must either be NoOffset to get changes since the beginning of time,
172
* or an offset that has been previously returned by this query
173
* @return A source of state changes
174
*/
175
def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
176
}
177
```
178
179
**Usage Examples:**
180
181
```scala
182
import akka.persistence.query.scaladsl.DurableStateStoreQuery
183
import akka.persistence.query.NoOffset
184
185
val stateStore: DurableStateStoreQuery[UserState] = getDurableStateStore()
186
187
// Query current state changes (finite stream)
188
stateStore
189
.currentChanges("user-states", NoOffset)
190
.runForeach {
191
case updated: UpdatedDurableState[UserState] =>
192
println(s"Current user state: ${updated.value}")
193
buildInitialProjection(updated.persistenceId, updated.value)
194
195
case deleted: DeletedDurableState[UserState] =>
196
println(s"User ${deleted.persistenceId} is deleted")
197
}
198
.onComplete {
199
case Success(_) => println("Finished processing current states")
200
case Failure(ex) => println(s"Failed to process current states: $ex")
201
}
202
203
// Query live state changes (infinite stream)
204
var lastOffset: Offset = NoOffset
205
206
stateStore
207
.changes("user-states", lastOffset)
208
.runForeach { change =>
209
change match {
210
case updated: UpdatedDurableState[UserState] =>
211
updateReadModel(updated.persistenceId, updated.value)
212
213
case deleted: DeletedDurableState[UserState] =>
214
removeFromReadModel(deleted.persistenceId)
215
}
216
217
// Update offset for resumption
218
lastOffset = change.offset
219
saveOffset(lastOffset)
220
}
221
```
222
223
### Durable State Store Paged Persistence IDs Query
224
225
Query interface for paginated persistence IDs from durable state stores.
226
227
```scala { .api }
228
/**
229
* A plugin may optionally support this query by implementing this trait.
230
*/
231
trait DurableStateStorePagedPersistenceIdsQuery[A] extends DurableStateStore[A] {
232
/**
233
* Get current persistence ids with pagination support.
234
*
235
* @param afterId Start after this persistence ID (exclusive)
236
* @param limit Maximum number of persistence IDs to return
237
* @return Source of persistence IDs
238
*/
239
def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]
240
}
241
```
242
243
**Usage Examples:**
244
245
```scala
246
import akka.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery
247
248
val stateStore: DurableStateStorePagedPersistenceIdsQuery[UserState] = getDurableStateStore()
249
250
// Get first page of persistence IDs
251
stateStore
252
.currentPersistenceIds(None, 100L)
253
.runForeach { persistenceId =>
254
println(s"Found persistence ID: $persistenceId")
255
256
// Load current state for this persistence ID
257
stateStore
258
.asInstanceOf[DurableStateStore[UserState]]
259
.getObject(persistenceId)
260
.foreach {
261
case Some(GetObjectResult(userState, revision)) =>
262
println(s"Current state for $persistenceId: $userState")
263
case None =>
264
println(s"No state found for $persistenceId")
265
}
266
}
267
268
// Paginate through all persistence IDs
269
def processAllPersistenceIds(afterId: Option[String] = None): Future[Unit] = {
270
stateStore
271
.currentPersistenceIds(afterId, 100L)
272
.runWith(Sink.seq)
273
.flatMap { ids =>
274
println(s"Processing ${ids.size} persistence IDs")
275
276
// Process this batch
277
ids.foreach(processPersistenceId)
278
279
if (ids.size == 100) {
280
// More pages available, continue with next page
281
processAllPersistenceIds(ids.lastOption)
282
} else {
283
// Last page, we're done
284
Future.successful(())
285
}
286
}
287
}
288
```
289
290
### Durable State Store by Slice Query
291
292
Slice-based query interface for durable state stores (typed API).
293
294
```scala { .api }
295
/**
296
* A plugin may optionally support this query by implementing this trait.
297
* API May Change
298
*/
299
trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
300
/**
301
* Get current state changes by slice range.
302
*/
303
def currentChangesBySlices(
304
entityType: String,
305
minSlice: Int,
306
maxSlice: Int,
307
offset: Offset
308
): Source[DurableStateChange[A], NotUsed]
309
310
/**
311
* Get live state changes by slice range.
312
*/
313
def changesBySlices(
314
entityType: String,
315
minSlice: Int,
316
maxSlice: Int,
317
offset: Offset
318
): Source[DurableStateChange[A], NotUsed]
319
320
/** Get the slice number for a given persistence ID */
321
def sliceForPersistenceId(persistenceId: String): Int
322
323
/** Get slice ranges for distributing the load */
324
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
325
}
326
```
327
328
**Usage Examples:**
329
330
```scala
331
import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
332
333
val stateStore: DurableStateStoreBySliceQuery[UserState] = getTypedDurableStateStore()
334
335
// Query state changes by slice range
336
stateStore
337
.changesBySlices("User", 0, 255, offset)
338
.runForeach { change =>
339
change match {
340
case updated: UpdatedDurableState[UserState] =>
341
println(s"User ${updated.persistenceId} updated in slice ${stateStore.sliceForPersistenceId(updated.persistenceId)}")
342
processUserUpdate(updated.value)
343
344
case deleted: DeletedDurableState[UserState] =>
345
println(s"User ${deleted.persistenceId} deleted")
346
processUserDeletion(deleted.persistenceId)
347
}
348
}
349
350
// Distribute state processing across multiple slices
351
val sliceRanges = stateStore.sliceRanges(4)
352
sliceRanges.zipWithIndex.foreach { case (range, processorId) =>
353
println(s"Processor $processorId handles slices ${range.start} to ${range.end}")
354
355
stateStore
356
.changesBySlices("User", range.start, range.end, offset)
357
.runForeach { change =>
358
processStateChangeInProcessor(processorId, change)
359
}
360
}
361
```
362
363
## State Processing Patterns
364
365
### State Projection Building
366
367
Build read model projections from state changes:
368
369
```scala
370
case class UserProjection(
371
persistenceId: String,
372
name: String,
373
email: String,
374
lastUpdated: Long,
375
revision: Long
376
)
377
378
class UserProjectionBuilder(stateStore: DurableStateStoreQuery[UserState]) {
379
380
def buildProjections(): Future[Map[String, UserProjection]] = {
381
stateStore
382
.currentChanges("user-states", NoOffset)
383
.runFold(Map.empty[String, UserProjection]) { (projections, change) =>
384
change match {
385
case updated: UpdatedDurableState[UserState] =>
386
val projection = UserProjection(
387
persistenceId = updated.persistenceId,
388
name = updated.value.name,
389
email = updated.value.email,
390
lastUpdated = updated.timestamp,
391
revision = updated.revision
392
)
393
projections + (updated.persistenceId -> projection)
394
395
case deleted: DeletedDurableState[UserState] =>
396
projections - deleted.persistenceId
397
}
398
}
399
}
400
401
def maintainProjections(initialProjections: Map[String, UserProjection]): Unit = {
402
var projections = initialProjections
403
var lastOffset: Offset = NoOffset
404
405
stateStore
406
.changes("user-states", lastOffset)
407
.runForeach { change =>
408
change match {
409
case updated: UpdatedDurableState[UserState] =>
410
val projection = UserProjection(
411
persistenceId = updated.persistenceId,
412
name = updated.value.name,
413
email = updated.value.email,
414
lastUpdated = updated.timestamp,
415
revision = updated.revision
416
)
417
projections = projections + (updated.persistenceId -> projection)
418
saveProjection(projection)
419
420
case deleted: DeletedDurableState[UserState] =>
421
projections = projections - deleted.persistenceId
422
deleteProjection(deleted.persistenceId)
423
}
424
425
lastOffset = change.offset
426
saveOffset(lastOffset)
427
}
428
}
429
}
430
```
431
432
### State Change Filtering
433
434
Filter state changes based on criteria:
435
436
```scala
437
// Filter by revision numbers
438
stateStore
439
.changes("user-states", offset)
440
.filter {
441
case updated: UpdatedDurableState[UserState] => updated.revision > 5L
442
case deleted: DeletedDurableState[UserState] => deleted.revision > 5L
443
}
444
.runForeach(processImportantChange)
445
446
// Filter by timestamp (last hour only)
447
val oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000)
448
449
stateStore
450
.changes("user-states", offset)
451
.filter(_.timestamp > oneHourAgo)
452
.runForeach(processRecentChange)
453
454
// Filter by state content
455
stateStore
456
.changes("user-states", offset)
457
.collect {
458
case updated: UpdatedDurableState[UserState] if updated.value.isActive =>
459
updated
460
}
461
.runForeach(processActiveUserUpdate)
462
```
463
464
### State Change Aggregation
465
466
Aggregate state changes for analytics:
467
468
```scala
469
case class StateChangeMetrics(
470
totalUpdates: Long,
471
totalDeletes: Long,
472
uniqueEntities: Set[String],
473
lastProcessed: Long
474
)
475
476
def aggregateStateChanges(): Future[StateChangeMetrics] = {
477
stateStore
478
.currentChanges("user-states", NoOffset)
479
.runFold(StateChangeMetrics(0L, 0L, Set.empty, 0L)) { (metrics, change) =>
480
change match {
481
case updated: UpdatedDurableState[UserState] =>
482
metrics.copy(
483
totalUpdates = metrics.totalUpdates + 1,
484
uniqueEntities = metrics.uniqueEntities + updated.persistenceId,
485
lastProcessed = math.max(metrics.lastProcessed, updated.timestamp)
486
)
487
488
case deleted: DeletedDurableState[UserState] =>
489
metrics.copy(
490
totalDeletes = metrics.totalDeletes + 1,
491
uniqueEntities = metrics.uniqueEntities + deleted.persistenceId,
492
lastProcessed = math.max(metrics.lastProcessed, deleted.timestamp)
493
)
494
}
495
}
496
}
497
```
498
499
## Error Handling
500
501
### Change Processing Failures
502
503
Handle failures in state change processing:
504
505
```scala
506
stateStore
507
.changes("user-states", offset)
508
.recover {
509
case ex: Exception =>
510
println(s"Error in state change stream: $ex")
511
// Return a placeholder or restart logic
512
UpdatedDurableState("error", 0L, UserState.empty, NoOffset, System.currentTimeMillis())
513
}
514
.runForeach { change =>
515
try {
516
processStateChange(change)
517
} catch {
518
case ex: Exception =>
519
println(s"Failed to process state change: $ex")
520
handleProcessingFailure(change, ex)
521
}
522
}
523
```
524
525
### Offset Management
526
527
Safely handle offset storage and retrieval:
528
529
```scala
530
class StateChangeProcessor(stateStore: DurableStateStoreQuery[UserState]) {
531
private var currentOffset: Offset = loadStoredOffset().getOrElse(NoOffset)
532
533
def start(): Unit = {
534
stateStore
535
.changes("user-states", currentOffset)
536
.runForeach { change =>
537
try {
538
processChange(change)
539
currentOffset = change.offset
540
saveOffset(currentOffset)
541
} catch {
542
case ex: Exception =>
543
println(s"Processing failed, keeping offset at $currentOffset: $ex")
544
// Don't update offset on failure
545
}
546
}
547
}
548
549
def restart(): Unit = {
550
// Reload offset and restart processing
551
currentOffset = loadStoredOffset().getOrElse(NoOffset)
552
start()
553
}
554
}
555
```
556
557
## Java API
558
559
Java API equivalents are available in the `javadsl` package:
560
561
```java
562
import akka.persistence.query.javadsl.DurableStateStoreQuery;
563
import akka.persistence.query.UpdatedDurableState;
564
import akka.persistence.query.DeletedDurableState;
565
566
// Java API usage
567
DurableStateStoreQuery<UserState> stateStore = getJavaDurableStateStore();
568
569
// Query current changes
570
stateStore
571
.currentChanges("user-states", NoOffset.getInstance())
572
.runForeach(change -> {
573
if (change instanceof UpdatedDurableState) {
574
UpdatedDurableState<UserState> updated = (UpdatedDurableState<UserState>) change;
575
System.out.println("Updated: " + updated.persistenceId());
576
processUserUpdate(updated.value());
577
} else if (change instanceof DeletedDurableState) {
578
DeletedDurableState<UserState> deleted = (DeletedDurableState<UserState>) change;
579
System.out.println("Deleted: " + deleted.persistenceId());
580
processUserDeletion(deleted.persistenceId());
581
}
582
}, system);
583
```
584
585
## Types
586
587
```scala { .api }
588
case class UserState(
589
name: String,
590
email: String,
591
isActive: Boolean,
592
preferences: Map[String, String]
593
) {
594
def isEmpty: Boolean = name.isEmpty && email.isEmpty
595
}
596
597
object UserState {
598
val empty: UserState = UserState("", "", false, Map.empty)
599
}
600
601
case class GetObjectResult[A](value: A, revision: Long)
602
603
trait DurableStateStore[A] {
604
def getObject(persistenceId: String): Future[Option[GetObjectResult[A]]]
605
def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done]
606
def deleteObject(persistenceId: String): Future[Done]
607
}
608
609
import akka.Done
610
import scala.collection.immutable
611
import akka.stream.scaladsl.{Source, Sink}
612
import akka.NotUsed
613
import scala.concurrent.Future
614
import scala.util.{Success, Failure}
615
```