or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md

typed-queries.mddocs/

0

# Typed Query API

1

2

The typed query API provides enhanced type-safe query interfaces with improved event envelopes and slice-based querying for horizontal scaling. All typed APIs are marked as @ApiMayChange and provide both Scala and Java variants.

3

4

## Capabilities

5

6

### Events by Slice Query

7

8

Query events by entity type and slice range for horizontal scaling and distributed processing.

9

10

```scala { .api }

11

/**

12

* A plugin may optionally support this query by implementing this trait.

13

* API May Change

14

*/

15

trait EventsBySliceQuery extends ReadJournal {

16

/**

17

* Query events for given entity type and slice range. Useful for distributing the load

18

* and implementing resilient query projections.

19

*

20

* @param entityType The entity type to query events for

21

* @param minSlice The minimum slice number (inclusive)

22

* @param maxSlice The maximum slice number (inclusive)

23

* @param offset The offset to start from

24

* @return Source of typed event envelopes

25

*/

26

def eventsBySlices[Event](

27

entityType: String,

28

minSlice: Int,

29

maxSlice: Int,

30

offset: Offset

31

): Source[EventEnvelope[Event], NotUsed]

32

33

/**

34

* Get the slice number for a given persistence ID.

35

* Useful for determining which slice a persistence ID belongs to.

36

*/

37

def sliceForPersistenceId(persistenceId: String): Int

38

39

/**

40

* Get slice ranges for distributing the load across multiple query processors.

41

*

42

* @param numberOfRanges Number of ranges to create

43

* @return Sequence of slice ranges

44

*/

45

def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]

46

}

47

```

48

49

**Usage Examples:**

50

51

```scala

52

import akka.persistence.query.typed.scaladsl.EventsBySliceQuery

53

import akka.persistence.query.typed.EventEnvelope

54

55

val readJournal: EventsBySliceQuery = getTypedReadJournal()

56

57

// Query events from specific slice range

58

readJournal

59

.eventsBySlices[UserEvent]("User", 0, 127, offset)

60

.runForeach { envelope =>

61

println(s"Event: ${envelope.event} from slice ${envelope.slice}")

62

println(s"Entity type: ${envelope.entityType}")

63

println(s"Tags: ${envelope.tags}")

64

}

65

66

// Distribute processing across multiple slices

67

val sliceRanges = readJournal.sliceRanges(4) // Create 4 ranges

68

sliceRanges.zipWithIndex.foreach { case (range, index) =>

69

println(s"Processor $index handles slices ${range.start} to ${range.end}")

70

71

readJournal

72

.eventsBySlices[UserEvent]("User", range.start, range.end, offset)

73

.runForeach { envelope =>

74

processEventInProcessor(index, envelope)

75

}

76

}

77

78

// Check which slice a persistence ID belongs to

79

val slice = readJournal.sliceForPersistenceId("user-12345")

80

println(s"user-12345 belongs to slice $slice")

81

```

82

83

### Current Events by Slice Query

84

85

Query current (finite) events by entity type and slice range.

86

87

```scala { .api }

88

/**

89

* A plugin may optionally support this query by implementing this trait.

90

* API May Change

91

*/

92

trait CurrentEventsBySliceQuery extends ReadJournal {

93

/**

94

* Same as EventsBySliceQuery#eventsBySlices but the event stream

95

* is completed immediately when it reaches the end of currently stored events.

96

*/

97

def currentEventsBySlices[Event](

98

entityType: String,

99

minSlice: Int,

100

maxSlice: Int,

101

offset: Offset

102

): Source[EventEnvelope[Event], NotUsed]

103

104

def sliceForPersistenceId(persistenceId: String): Int

105

def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]

106

}

107

```

108

109

**Usage Examples:**

110

111

```scala

112

import akka.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery

113

114

val readJournal: CurrentEventsBySliceQuery = getTypedReadJournal()

115

116

// Process all current events from slice range (finite stream)

117

readJournal

118

.currentEventsBySlices[OrderEvent]("Order", 512, 1023, offset)

119

.runForeach { envelope =>

120

processHistoricalEvent(envelope.event)

121

}

122

.onComplete {

123

case Success(_) => println("Finished processing historical events")

124

case Failure(ex) => println(s"Processing failed: $ex")

125

}

126

```

127

128

### Events by Persistence ID Typed Query

129

130

Query events for a specific persistence ID with typed envelopes.

131

132

```scala { .api }

133

/**

134

* A plugin may optionally support this query by implementing this trait.

135

* API May Change

136

*/

137

trait EventsByPersistenceIdTypedQuery extends ReadJournal {

138

/**

139

* Query events for a specific persistence ID with enhanced typed envelopes.

140

*/

141

def eventsByPersistenceIdTyped[Event](

142

persistenceId: String,

143

fromSequenceNr: Long,

144

toSequenceNr: Long

145

): Source[EventEnvelope[Event], NotUsed]

146

}

147

```

148

149

**Usage Examples:**

150

151

```scala

152

import akka.persistence.query.typed.scaladsl.EventsByPersistenceIdTypedQuery

153

154

val readJournal: EventsByPersistenceIdTypedQuery = getTypedReadJournal()

155

156

// Query typed events for persistence ID

157

readJournal

158

.eventsByPersistenceIdTyped[UserEvent]("user-123", 0L, Long.MaxValue)

159

.runForeach { envelope =>

160

println(s"Typed event: ${envelope.event}")

161

println(s"Entity type: ${envelope.entityType}")

162

println(s"Event tags: ${envelope.tags}")

163

164

// Type-safe event processing

165

envelope.event match {

166

case UserCreated(name, email) => println(s"User $name created")

167

case UserUpdated(name, email) => println(s"User $name updated")

168

case UserDeleted(userId) => println(s"User $userId deleted")

169

}

170

}

171

```

172

173

### Current Events by Persistence ID Typed Query

174

175

Query current events for a specific persistence ID with typed envelopes.

176

177

```scala { .api }

178

/**

179

* A plugin may optionally support this query by implementing this trait.

180

* API May Change

181

*/

182

trait CurrentEventsByPersistenceIdTypedQuery extends ReadJournal {

183

/**

184

* Same as EventsByPersistenceIdTypedQuery but completed when reaching

185

* the end of currently stored events.

186

*/

187

def currentEventsByPersistenceIdTyped[Event](

188

persistenceId: String,

189

fromSequenceNr: Long,

190

toSequenceNr: Long

191

): Source[EventEnvelope[Event], NotUsed]

192

}

193

```

194

195

### Events by Persistence ID Starting from Snapshot

196

197

Query events starting from a snapshot with transformation.

198

199

```scala { .api }

200

/**

201

* A plugin may optionally support this query by implementing this trait.

202

* API May Change

203

*/

204

trait EventsByPersistenceIdStartingFromSnapshotQuery extends ReadJournal {

205

/**

206

* Query events starting from a snapshot. The snapshot is loaded and transformed

207

* to an event, then followed by events from the sequence number after the snapshot.

208

*/

209

def eventsByPersistenceIdStartingFromSnapshot[Snapshot, Event](

210

persistenceId: String,

211

fromSequenceNr: Long,

212

toSequenceNr: Long,

213

transformSnapshot: Snapshot => Event

214

): Source[EventEnvelope[Event], NotUsed]

215

}

216

```

217

218

**Usage Examples:**

219

220

```scala

221

import akka.persistence.query.typed.scaladsl.EventsByPersistenceIdStartingFromSnapshotQuery

222

223

val readJournal: EventsByPersistenceIdStartingFromSnapshotQuery = getTypedReadJournal()

224

225

// Transform snapshot to event and include in stream

226

readJournal

227

.eventsByPersistenceIdStartingFromSnapshot[UserSnapshot, UserEvent](

228

persistenceId = "user-123",

229

fromSequenceNr = 0L,

230

toSequenceNr = Long.MaxValue,

231

transformSnapshot = snapshot => UserStateRestored(snapshot.name, snapshot.email, snapshot.preferences)

232

)

233

.runForeach { envelope =>

234

envelope.event match {

235

case UserStateRestored(name, email, prefs) =>

236

println(s"Restored user state: $name")

237

case otherEvent =>

238

println(s"Regular event: $otherEvent")

239

}

240

}

241

```

242

243

### Events by Slice Starting from Snapshots

244

245

Query events by slice starting from snapshots with transformation.

246

247

```scala { .api }

248

/**

249

* A plugin may optionally support this query by implementing this trait.

250

* API May Change

251

*/

252

trait EventsBySliceStartingFromSnapshotsQuery extends ReadJournal {

253

/**

254

* Query events by slice starting from snapshots. Snapshots are loaded and transformed

255

* to events, then followed by events from the sequence number after the snapshot.

256

*/

257

def eventsBySlicesStartingFromSnapshots[Snapshot, Event](

258

entityType: String,

259

minSlice: Int,

260

maxSlice: Int,

261

offset: Offset,

262

transformSnapshot: Snapshot => Event

263

): Source[EventEnvelope[Event], NotUsed]

264

265

def sliceForPersistenceId(persistenceId: String): Int

266

def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]

267

}

268

```

269

270

**Usage Examples:**

271

272

```scala

273

import akka.persistence.query.typed.scaladsl.EventsBySliceStartingFromSnapshotsQuery

274

275

val readJournal: EventsBySliceStartingFromSnapshotsQuery = getTypedReadJournal()

276

277

// Query slice events starting from snapshots

278

readJournal

279

.eventsBySlicesStartingFromSnapshots[OrderSnapshot, OrderEvent](

280

entityType = "Order",

281

minSlice = 0,

282

maxSlice = 255,

283

offset = offset,

284

transformSnapshot = snapshot => OrderStateRestored(

285

orderId = snapshot.orderId,

286

items = snapshot.items,

287

total = snapshot.total,

288

status = snapshot.status

289

)

290

)

291

.runForeach { envelope =>

292

envelope.event match {

293

case OrderStateRestored(orderId, items, total, status) =>

294

println(s"Restored order $orderId with $total")

295

case regularEvent =>

296

processOrderEvent(regularEvent)

297

}

298

}

299

```

300

301

### Load Event Query

302

303

Load individual events on demand.

304

305

```scala { .api }

306

/**

307

* A plugin may optionally support this query by implementing this trait.

308

* API May Change

309

*/

310

trait LoadEventQuery extends ReadJournal {

311

/**

312

* Load a specific event envelope by persistence ID and sequence number.

313

* Useful for loading events that were not included in the original query result.

314

*/

315

def loadEnvelope[Event](

316

persistenceId: String,

317

sequenceNr: Long

318

): Future[EventEnvelope[Event]]

319

}

320

```

321

322

**Usage Examples:**

323

324

```scala

325

import akka.persistence.query.typed.scaladsl.LoadEventQuery

326

327

val readJournal: LoadEventQuery = getTypedReadJournal()

328

329

// Load specific event on demand

330

def processEnvelopeWithLoading[Event](envelope: EventEnvelope[Event]): Future[Unit] = {

331

envelope.eventOption match {

332

case Some(event) =>

333

Future.successful(processEvent(event))

334

case None if !envelope.filtered =>

335

// Event not loaded, load it on demand

336

readJournal

337

.loadEnvelope[Event](envelope.persistenceId, envelope.sequenceNr)

338

.map(loadedEnvelope => processEvent(loadedEnvelope.event))

339

case None =>

340

// Event was filtered, skip processing

341

Future.successful(())

342

}

343

}

344

345

// Use with slice queries

346

readJournal

347

.asInstanceOf[EventsBySliceQuery]

348

.eventsBySlices[UserEvent]("User", 0, 127, offset)

349

.mapAsync(4)(processEnvelopeWithLoading)

350

.runWith(Sink.ignore)

351

```

352

353

### Event Timestamp Query

354

355

Query timestamps for specific events.

356

357

```scala { .api }

358

/**

359

* A plugin may optionally support this query by implementing this trait.

360

* API May Change

361

*/

362

trait EventTimestampQuery extends ReadJournal {

363

/**

364

* Get the timestamp of a specific event by persistence ID and sequence number.

365

* Returns None if the event doesn't exist.

366

*/

367

def timestampOf(

368

persistenceId: String,

369

sequenceNr: Long

370

): Future[Option[Instant]]

371

}

372

```

373

374

**Usage Examples:**

375

376

```scala

377

import akka.persistence.query.typed.scaladsl.EventTimestampQuery

378

import java.time.Instant

379

380

val readJournal: EventTimestampQuery = getTypedReadJournal()

381

382

// Get timestamp of specific event

383

readJournal

384

.timestampOf("user-123", 42L)

385

.foreach {

386

case Some(timestamp) =>

387

println(s"Event 42 was stored at: $timestamp")

388

case None =>

389

println("Event 42 not found")

390

}

391

392

// Check event age

393

def checkEventAge(persistenceId: String, sequenceNr: Long): Future[Unit] = {

394

readJournal

395

.timestampOf(persistenceId, sequenceNr)

396

.map {

397

case Some(timestamp) =>

398

val age = java.time.Duration.between(timestamp, Instant.now())

399

if (age.toDays > 30) {

400

println(s"Event is ${age.toDays} days old")

401

}

402

case None =>

403

println("Event not found")

404

}

405

}

406

```

407

408

## Slice-Based Processing Patterns

409

410

### Distributed Event Processing

411

412

Use slices to distribute event processing across multiple nodes:

413

414

```scala

415

def startSliceProcessor(processorId: Int, totalProcessors: Int): Unit = {

416

val readJournal: EventsBySliceQuery = getTypedReadJournal()

417

418

// Calculate slice range for this processor

419

val sliceRanges = readJournal.sliceRanges(totalProcessors)

420

val myRange = sliceRanges(processorId)

421

422

println(s"Processor $processorId handling slices ${myRange.start} to ${myRange.end}")

423

424

readJournal

425

.eventsBySlices[DomainEvent]("Entity", myRange.start, myRange.end, offset)

426

.runForeach { envelope =>

427

// Process events for this slice range

428

processEvent(envelope.event)

429

430

// Update offset for this processor

431

saveProcessorOffset(processorId, envelope.offset)

432

}

433

}

434

435

// Start 8 processors

436

(0 until 8).foreach(startSliceProcessor(_, 8))

437

```

438

439

### Entity Type Filtering

440

441

Process different entity types with separate processors:

442

443

```scala

444

val entityTypes = List("User", "Order", "Payment", "Inventory")

445

446

entityTypes.foreach { entityType =>

447

readJournal

448

.eventsBySlices[DomainEvent](entityType, 0, 1023, offset)

449

.runForeach { envelope =>

450

entityType match {

451

case "User" => processUserEvent(envelope.event)

452

case "Order" => processOrderEvent(envelope.event)

453

case "Payment" => processPaymentEvent(envelope.event)

454

case "Inventory" => processInventoryEvent(envelope.event)

455

}

456

}

457

}

458

```

459

460

### Snapshot Integration

461

462

Combine snapshots with event streams for efficient state reconstruction:

463

464

```scala

465

def buildProjectionFromSnapshots[State, Event](

466

entityType: String,

467

slice: Int,

468

initialState: State,

469

applyEvent: (State, Event) => State,

470

applySnapshot: UserSnapshot => State

471

): Future[State] = {

472

473

readJournal

474

.eventsBySlicesStartingFromSnapshots[UserSnapshot, Event](

475

entityType = entityType,

476

minSlice = slice,

477

maxSlice = slice,

478

offset = TimestampOffset.Zero,

479

transformSnapshot = snapshot => SnapshotRestored(snapshot).asInstanceOf[Event]

480

)

481

.runFold(initialState) { (state, envelope) =>

482

envelope.event match {

483

case SnapshotRestored(snapshot) =>

484

applySnapshot(snapshot.asInstanceOf[UserSnapshot])

485

case event =>

486

applyEvent(state, event)

487

}

488

}

489

}

490

```

491

492

## Java API

493

494

All typed Scala query traits have corresponding Java API equivalents:

495

496

```java

497

import akka.persistence.query.typed.javadsl.*;

498

import java.util.concurrent.CompletionStage;

499

import java.util.List;

500

import akka.japi.Pair;

501

502

// Java API usage

503

EventsBySliceQuery readJournal = getJavaTypedReadJournal();

504

505

// Query events by slice

506

readJournal

507

.eventsBySlices(UserEvent.class, "User", 0, 127, offset)

508

.runForeach(envelope -> {

509

System.out.println("Event: " + envelope.getEvent());

510

System.out.println("Entity type: " + envelope.entityType());

511

System.out.println("Slice: " + envelope.slice());

512

}, system);

513

514

// Get slice ranges for distribution

515

List<Pair<Integer, Integer>> ranges = readJournal.sliceRanges(4);

516

ranges.forEach(range -> {

517

System.out.println("Range: " + range.first() + " to " + range.second());

518

});

519

520

// Load event on demand

521

LoadEventQuery loadQuery = (LoadEventQuery) readJournal;

522

CompletionStage<EventEnvelope<UserEvent>> future =

523

loadQuery.loadEnvelope(UserEvent.class, "user-123", 42L);

524

```

525

526

## Error Handling

527

528

### Event Loading Failures

529

530

Handle failures when loading events on demand:

531

532

```scala

533

def safeLoadEvent[Event](

534

persistenceId: String,

535

sequenceNr: Long

536

): Future[Option[Event]] = {

537

readJournal

538

.asInstanceOf[LoadEventQuery]

539

.loadEnvelope[Event](persistenceId, sequenceNr)

540

.map(envelope => Some(envelope.event))

541

.recover {

542

case _: NoSuchElementException => None

543

case ex =>

544

println(s"Failed to load event: $ex")

545

None

546

}

547

}

548

```

549

550

### Slice Processing Failures

551

552

Handle failures in slice-based processing:

553

554

```scala

555

def resilientSliceProcessing(slice: Int): Unit = {

556

readJournal

557

.eventsBySlices[DomainEvent]("Entity", slice, slice, offset)

558

.recover {

559

case ex: Exception =>

560

println(s"Error in slice $slice: $ex")

561

// Return empty envelope or restart logic

562

EventEnvelope.empty

563

}

564

.runForeach { envelope =>

565

try {

566

processEvent(envelope.event)

567

} catch {

568

case ex: Exception =>

569

println(s"Failed to process event in slice $slice: $ex")

570

}

571

}

572

}

573

```

574

575

## Types

576

577

```scala { .api }

578

case class UserEvent(userId: String, eventType: String, data: Map[String, Any])

579

case class OrderEvent(orderId: String, eventType: String, data: Map[String, Any])

580

case class DomainEvent(entityId: String, eventType: String, payload: Any)

581

582

case class UserCreated(name: String, email: String)

583

case class UserUpdated(name: String, email: String)

584

case class UserDeleted(userId: String)

585

case class UserStateRestored(name: String, email: String, preferences: Map[String, String])

586

587

case class OrderStateRestored(orderId: String, items: List[String], total: BigDecimal, status: String)

588

case class SnapshotRestored[T](snapshot: T)

589

590

case class UserSnapshot(name: String, email: String, preferences: Map[String, String])

591

case class OrderSnapshot(orderId: String, items: List[String], total: BigDecimal, status: String)

592

593

import scala.concurrent.Future

594

import java.time.Instant

595

import akka.stream.scaladsl.{Source, Sink}

596

import akka.NotUsed

597

import scala.collection.immutable

598

```