or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md

durable-state.mddocs/

0

# Durable State Queries

1

2

Durable state queries provide interfaces for querying state changes and persistence IDs from durable state stores. These complement event-based queries by focusing on state changes rather than individual events.

3

4

## Capabilities

5

6

### Durable State Change Types

7

8

Base trait and implementations for representing state changes.

9

10

```scala { .api }

11

/**

12

* The DurableStateStoreQuery stream elements for DurableStateStoreQuery.

13

* The implementation can be UpdatedDurableState or DeletedDurableState.

14

* Not for user extension.

15

*/

16

sealed trait DurableStateChange[A] {

17

/** The persistence id of the origin entity */

18

def persistenceId: String

19

20

/** The offset that can be used in next changes or currentChanges query */

21

def offset: Offset

22

}

23

```

24

25

### Updated Durable State

26

27

Represents an updated state change with the new value.

28

29

```scala { .api }

30

/**

31

* Updated durable state change containing the new state value.

32

*

33

* @param persistenceId The persistence id of the origin entity

34

* @param revision The revision number from the origin entity

35

* @param value The new state value

36

* @param offset The offset that can be used in next changes or currentChanges query

37

* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC

38

*/

39

final class UpdatedDurableState[A](

40

val persistenceId: String,

41

val revision: Long,

42

val value: A,

43

override val offset: Offset,

44

val timestamp: Long

45

) extends DurableStateChange[A]

46

```

47

48

**Usage Examples:**

49

50

```scala

51

import akka.persistence.query.{UpdatedDurableState, Sequence}

52

53

// Pattern matching on state changes

54

readJournal

55

.changes("user-states", offset)

56

.runForeach {

57

case updated: UpdatedDurableState[UserState] =>

58

println(s"User ${updated.persistenceId} updated to revision ${updated.revision}")

59

println(s"New state: ${updated.value}")

60

println(s"Updated at: ${updated.timestamp}")

61

62

// Process the updated state

63

processUserStateUpdate(updated.value)

64

65

case deleted: DeletedDurableState[UserState] =>

66

println(s"User ${deleted.persistenceId} deleted at revision ${deleted.revision}")

67

processUserStateDeletion(deleted.persistenceId)

68

}

69

70

// Extract fields using unapply

71

readJournal

72

.changes("user-states", offset)

73

.runForeach {

74

case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) =>

75

println(s"Updated: $persistenceId at revision $revision")

76

updateProjection(persistenceId, value.asInstanceOf[UserState])

77

}

78

```

79

80

### Deleted Durable State

81

82

Represents a deleted state change.

83

84

```scala { .api }

85

/**

86

* Deleted durable state change indicating the state was removed.

87

*

88

* @param persistenceId The persistence id of the origin entity

89

* @param revision The revision number from the origin entity

90

* @param offset The offset that can be used in next changes or currentChanges query

91

* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC

92

*/

93

final class DeletedDurableState[A](

94

val persistenceId: String,

95

val revision: Long,

96

override val offset: Offset,

97

val timestamp: Long

98

) extends DurableStateChange[A]

99

```

100

101

**Usage Examples:**

102

103

```scala

104

import akka.persistence.query.DeletedDurableState

105

106

// Handle deleted state

107

readJournal

108

.changes("user-states", offset)

109

.runForeach {

110

case deleted: DeletedDurableState[UserState] =>

111

println(s"User ${deleted.persistenceId} deleted at revision ${deleted.revision}")

112

113

// Clean up related data

114

cleanupUserData(deleted.persistenceId)

115

116

// Update read models

117

removeFromReadModel(deleted.persistenceId)

118

119

case updated: UpdatedDurableState[UserState] =>

120

// Handle updates

121

processUserStateUpdate(updated.value)

122

}

123

124

// Pattern matching with extraction

125

readJournal

126

.changes("user-states", offset)

127

.runForeach {

128

case DeletedDurableState(persistenceId, revision, offset, timestamp) =>

129

println(s"Deleted: $persistenceId at revision $revision, timestamp $timestamp")

130

handleDeletion(persistenceId)

131

}

132

```

133

134

### Durable State Store Query

135

136

Main query interface for durable state changes.

137

138

```scala { .api }

139

/**

140

* Query API for reading durable state objects.

141

*/

142

trait DurableStateStoreQuery[A] extends DurableStateStore[A] {

143

/**

144

* Get a source of the most recent changes made to objects with the given tag since the passed in offset.

145

*

146

* Note that this only returns the most recent change to each object, if an object has been updated multiple times

147

* since the offset, only the most recent of those changes will be part of the stream.

148

*

149

* This will return changes that occurred up to when the Source returned by this call is materialized. Changes to

150

* objects made since materialization are not guaranteed to be included in the results.

151

*

152

* @param tag The tag to get changes for

153

* @param offset The offset to get changes since. Must either be NoOffset to get changes since the beginning of time,

154

* or an offset that has been previously returned by this query

155

* @return A source of state changes

156

*/

157

def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]

158

159

/**

160

* Get a source of the most recent changes made to objects of the given tag since the passed in offset.

161

*

162

* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as

163

* they happen.

164

*

165

* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most

166

* recent change for each object since the offset will be emitted. In particular, multiple updates to a given object

167

* in quick succession are likely to be skipped, with only the last update resulting in a change from this

168

* source.

169

*

170

* @param tag The tag to get changes for

171

* @param offset The offset to get changes since. Must either be NoOffset to get changes since the beginning of time,

172

* or an offset that has been previously returned by this query

173

* @return A source of state changes

174

*/

175

def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]

176

}

177

```

178

179

**Usage Examples:**

180

181

```scala

182

import akka.persistence.query.scaladsl.DurableStateStoreQuery

183

import akka.persistence.query.NoOffset

184

185

val stateStore: DurableStateStoreQuery[UserState] = getDurableStateStore()

186

187

// Query current state changes (finite stream)

188

stateStore

189

.currentChanges("user-states", NoOffset)

190

.runForeach {

191

case updated: UpdatedDurableState[UserState] =>

192

println(s"Current user state: ${updated.value}")

193

buildInitialProjection(updated.persistenceId, updated.value)

194

195

case deleted: DeletedDurableState[UserState] =>

196

println(s"User ${deleted.persistenceId} is deleted")

197

}

198

.onComplete {

199

case Success(_) => println("Finished processing current states")

200

case Failure(ex) => println(s"Failed to process current states: $ex")

201

}

202

203

// Query live state changes (infinite stream)

204

var lastOffset: Offset = NoOffset

205

206

stateStore

207

.changes("user-states", lastOffset)

208

.runForeach { change =>

209

change match {

210

case updated: UpdatedDurableState[UserState] =>

211

updateReadModel(updated.persistenceId, updated.value)

212

213

case deleted: DeletedDurableState[UserState] =>

214

removeFromReadModel(deleted.persistenceId)

215

}

216

217

// Update offset for resumption

218

lastOffset = change.offset

219

saveOffset(lastOffset)

220

}

221

```

222

223

### Durable State Store Paged Persistence IDs Query

224

225

Query interface for paginated persistence IDs from durable state stores.

226

227

```scala { .api }

228

/**

229

* A plugin may optionally support this query by implementing this trait.

230

*/

231

trait DurableStateStorePagedPersistenceIdsQuery[A] extends DurableStateStore[A] {

232

/**

233

* Get current persistence ids with pagination support.

234

*

235

* @param afterId Start after this persistence ID (exclusive)

236

* @param limit Maximum number of persistence IDs to return

237

* @return Source of persistence IDs

238

*/

239

def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]

240

}

241

```

242

243

**Usage Examples:**

244

245

```scala

246

import akka.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery

247

248

val stateStore: DurableStateStorePagedPersistenceIdsQuery[UserState] = getDurableStateStore()

249

250

// Get first page of persistence IDs

251

stateStore

252

.currentPersistenceIds(None, 100L)

253

.runForeach { persistenceId =>

254

println(s"Found persistence ID: $persistenceId")

255

256

// Load current state for this persistence ID

257

stateStore

258

.asInstanceOf[DurableStateStore[UserState]]

259

.getObject(persistenceId)

260

.foreach {

261

case Some(GetObjectResult(userState, revision)) =>

262

println(s"Current state for $persistenceId: $userState")

263

case None =>

264

println(s"No state found for $persistenceId")

265

}

266

}

267

268

// Paginate through all persistence IDs

269

def processAllPersistenceIds(afterId: Option[String] = None): Future[Unit] = {

270

stateStore

271

.currentPersistenceIds(afterId, 100L)

272

.runWith(Sink.seq)

273

.flatMap { ids =>

274

println(s"Processing ${ids.size} persistence IDs")

275

276

// Process this batch

277

ids.foreach(processPersistenceId)

278

279

if (ids.size == 100) {

280

// More pages available, continue with next page

281

processAllPersistenceIds(ids.lastOption)

282

} else {

283

// Last page, we're done

284

Future.successful(())

285

}

286

}

287

}

288

```

289

290

### Durable State Store by Slice Query

291

292

Slice-based query interface for durable state stores (typed API).

293

294

```scala { .api }

295

/**

296

* A plugin may optionally support this query by implementing this trait.

297

* API May Change

298

*/

299

trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {

300

/**

301

* Get current state changes by slice range.

302

*/

303

def currentChangesBySlices(

304

entityType: String,

305

minSlice: Int,

306

maxSlice: Int,

307

offset: Offset

308

): Source[DurableStateChange[A], NotUsed]

309

310

/**

311

* Get live state changes by slice range.

312

*/

313

def changesBySlices(

314

entityType: String,

315

minSlice: Int,

316

maxSlice: Int,

317

offset: Offset

318

): Source[DurableStateChange[A], NotUsed]

319

320

/** Get the slice number for a given persistence ID */

321

def sliceForPersistenceId(persistenceId: String): Int

322

323

/** Get slice ranges for distributing the load */

324

def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]

325

}

326

```

327

328

**Usage Examples:**

329

330

```scala

331

import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery

332

333

val stateStore: DurableStateStoreBySliceQuery[UserState] = getTypedDurableStateStore()

334

335

// Query state changes by slice range

336

stateStore

337

.changesBySlices("User", 0, 255, offset)

338

.runForeach { change =>

339

change match {

340

case updated: UpdatedDurableState[UserState] =>

341

println(s"User ${updated.persistenceId} updated in slice ${stateStore.sliceForPersistenceId(updated.persistenceId)}")

342

processUserUpdate(updated.value)

343

344

case deleted: DeletedDurableState[UserState] =>

345

println(s"User ${deleted.persistenceId} deleted")

346

processUserDeletion(deleted.persistenceId)

347

}

348

}

349

350

// Distribute state processing across multiple slices

351

val sliceRanges = stateStore.sliceRanges(4)

352

sliceRanges.zipWithIndex.foreach { case (range, processorId) =>

353

println(s"Processor $processorId handles slices ${range.start} to ${range.end}")

354

355

stateStore

356

.changesBySlices("User", range.start, range.end, offset)

357

.runForeach { change =>

358

processStateChangeInProcessor(processorId, change)

359

}

360

}

361

```

362

363

## State Processing Patterns

364

365

### State Projection Building

366

367

Build read model projections from state changes:

368

369

```scala

370

case class UserProjection(

371

persistenceId: String,

372

name: String,

373

email: String,

374

lastUpdated: Long,

375

revision: Long

376

)

377

378

class UserProjectionBuilder(stateStore: DurableStateStoreQuery[UserState]) {

379

380

def buildProjections(): Future[Map[String, UserProjection]] = {

381

stateStore

382

.currentChanges("user-states", NoOffset)

383

.runFold(Map.empty[String, UserProjection]) { (projections, change) =>

384

change match {

385

case updated: UpdatedDurableState[UserState] =>

386

val projection = UserProjection(

387

persistenceId = updated.persistenceId,

388

name = updated.value.name,

389

email = updated.value.email,

390

lastUpdated = updated.timestamp,

391

revision = updated.revision

392

)

393

projections + (updated.persistenceId -> projection)

394

395

case deleted: DeletedDurableState[UserState] =>

396

projections - deleted.persistenceId

397

}

398

}

399

}

400

401

def maintainProjections(initialProjections: Map[String, UserProjection]): Unit = {

402

var projections = initialProjections

403

var lastOffset: Offset = NoOffset

404

405

stateStore

406

.changes("user-states", lastOffset)

407

.runForeach { change =>

408

change match {

409

case updated: UpdatedDurableState[UserState] =>

410

val projection = UserProjection(

411

persistenceId = updated.persistenceId,

412

name = updated.value.name,

413

email = updated.value.email,

414

lastUpdated = updated.timestamp,

415

revision = updated.revision

416

)

417

projections = projections + (updated.persistenceId -> projection)

418

saveProjection(projection)

419

420

case deleted: DeletedDurableState[UserState] =>

421

projections = projections - deleted.persistenceId

422

deleteProjection(deleted.persistenceId)

423

}

424

425

lastOffset = change.offset

426

saveOffset(lastOffset)

427

}

428

}

429

}

430

```

431

432

### State Change Filtering

433

434

Filter state changes based on criteria:

435

436

```scala

437

// Filter by revision numbers

438

stateStore

439

.changes("user-states", offset)

440

.filter {

441

case updated: UpdatedDurableState[UserState] => updated.revision > 5L

442

case deleted: DeletedDurableState[UserState] => deleted.revision > 5L

443

}

444

.runForeach(processImportantChange)

445

446

// Filter by timestamp (last hour only)

447

val oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000)

448

449

stateStore

450

.changes("user-states", offset)

451

.filter(_.timestamp > oneHourAgo)

452

.runForeach(processRecentChange)

453

454

// Filter by state content

455

stateStore

456

.changes("user-states", offset)

457

.collect {

458

case updated: UpdatedDurableState[UserState] if updated.value.isActive =>

459

updated

460

}

461

.runForeach(processActiveUserUpdate)

462

```

463

464

### State Change Aggregation

465

466

Aggregate state changes for analytics:

467

468

```scala

469

case class StateChangeMetrics(

470

totalUpdates: Long,

471

totalDeletes: Long,

472

uniqueEntities: Set[String],

473

lastProcessed: Long

474

)

475

476

def aggregateStateChanges(): Future[StateChangeMetrics] = {

477

stateStore

478

.currentChanges("user-states", NoOffset)

479

.runFold(StateChangeMetrics(0L, 0L, Set.empty, 0L)) { (metrics, change) =>

480

change match {

481

case updated: UpdatedDurableState[UserState] =>

482

metrics.copy(

483

totalUpdates = metrics.totalUpdates + 1,

484

uniqueEntities = metrics.uniqueEntities + updated.persistenceId,

485

lastProcessed = math.max(metrics.lastProcessed, updated.timestamp)

486

)

487

488

case deleted: DeletedDurableState[UserState] =>

489

metrics.copy(

490

totalDeletes = metrics.totalDeletes + 1,

491

uniqueEntities = metrics.uniqueEntities + deleted.persistenceId,

492

lastProcessed = math.max(metrics.lastProcessed, deleted.timestamp)

493

)

494

}

495

}

496

}

497

```

498

499

## Error Handling

500

501

### Change Processing Failures

502

503

Handle failures in state change processing:

504

505

```scala

506

stateStore

507

.changes("user-states", offset)

508

.recover {

509

case ex: Exception =>

510

println(s"Error in state change stream: $ex")

511

// Return a placeholder or restart logic

512

UpdatedDurableState("error", 0L, UserState.empty, NoOffset, System.currentTimeMillis())

513

}

514

.runForeach { change =>

515

try {

516

processStateChange(change)

517

} catch {

518

case ex: Exception =>

519

println(s"Failed to process state change: $ex")

520

handleProcessingFailure(change, ex)

521

}

522

}

523

```

524

525

### Offset Management

526

527

Safely handle offset storage and retrieval:

528

529

```scala

530

class StateChangeProcessor(stateStore: DurableStateStoreQuery[UserState]) {

531

private var currentOffset: Offset = loadStoredOffset().getOrElse(NoOffset)

532

533

def start(): Unit = {

534

stateStore

535

.changes("user-states", currentOffset)

536

.runForeach { change =>

537

try {

538

processChange(change)

539

currentOffset = change.offset

540

saveOffset(currentOffset)

541

} catch {

542

case ex: Exception =>

543

println(s"Processing failed, keeping offset at $currentOffset: $ex")

544

// Don't update offset on failure

545

}

546

}

547

}

548

549

def restart(): Unit = {

550

// Reload offset and restart processing

551

currentOffset = loadStoredOffset().getOrElse(NoOffset)

552

start()

553

}

554

}

555

```

556

557

## Java API

558

559

Java API equivalents are available in the `javadsl` package:

560

561

```java

562

import akka.persistence.query.javadsl.DurableStateStoreQuery;

563

import akka.persistence.query.UpdatedDurableState;

564

import akka.persistence.query.DeletedDurableState;

565

566

// Java API usage

567

DurableStateStoreQuery<UserState> stateStore = getJavaDurableStateStore();

568

569

// Query current changes

570

stateStore

571

.currentChanges("user-states", NoOffset.getInstance())

572

.runForeach(change -> {

573

if (change instanceof UpdatedDurableState) {

574

UpdatedDurableState<UserState> updated = (UpdatedDurableState<UserState>) change;

575

System.out.println("Updated: " + updated.persistenceId());

576

processUserUpdate(updated.value());

577

} else if (change instanceof DeletedDurableState) {

578

DeletedDurableState<UserState> deleted = (DeletedDurableState<UserState>) change;

579

System.out.println("Deleted: " + deleted.persistenceId());

580

processUserDeletion(deleted.persistenceId());

581

}

582

}, system);

583

```

584

585

## Types

586

587

```scala { .api }

588

case class UserState(

589

name: String,

590

email: String,

591

isActive: Boolean,

592

preferences: Map[String, String]

593

) {

594

def isEmpty: Boolean = name.isEmpty && email.isEmpty

595

}

596

597

object UserState {

598

val empty: UserState = UserState("", "", false, Map.empty)

599

}

600

601

case class GetObjectResult[A](value: A, revision: Long)

602

603

trait DurableStateStore[A] {

604

def getObject(persistenceId: String): Future[Option[GetObjectResult[A]]]

605

def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done]

606

def deleteObject(persistenceId: String): Future[Done]

607

}

608

609

import akka.Done

610

import scala.collection.immutable

611

import akka.stream.scaladsl.{Source, Sink}

612

import akka.NotUsed

613

import scala.concurrent.Future

614

import scala.util.{Success, Failure}

615

```