or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md

untyped-queries.mddocs/

0

# Untyped Query API

1

2

The untyped (original) query API provides standardized interfaces for querying persisted events and persistence IDs. These traits define optional capabilities that read journal implementations may support.

3

4

## Capabilities

5

6

### Base Read Journal

7

8

Base marker trait for all Scala read journal implementations.

9

10

```scala { .api }

11

/**

12

* API for reading persistent events and information derived from stored persistent events.

13

*

14

* The purpose of the API is not to enforce compatibility between different

15

* journal implementations, because the technical capabilities may be very different.

16

* The interface is very open so that different journals may implement specific queries.

17

*/

18

trait ReadJournal

19

```

20

21

### Events by Persistence ID

22

23

Query events for a specific persistent actor by its persistence ID.

24

25

```scala { .api }

26

/**

27

* A plugin may optionally support this query by implementing this trait.

28

*/

29

trait EventsByPersistenceIdQuery extends ReadJournal {

30

/**

31

* Query events for a specific PersistentActor identified by persistenceId.

32

*

33

* You can retrieve a subset of all events by specifying fromSequenceNr and toSequenceNr

34

* or use 0L and Long.MaxValue respectively to retrieve all events. The query will

35

* return all the events inclusive of the fromSequenceNr and toSequenceNr values.

36

*

37

* The returned event stream should be ordered by sequence number.

38

*

39

* The stream is not completed when it reaches the end of the currently stored events,

40

* but it continues to push new events when new events are persisted.

41

*/

42

def eventsByPersistenceId(

43

persistenceId: String,

44

fromSequenceNr: Long,

45

toSequenceNr: Long

46

): Source[EventEnvelope, NotUsed]

47

}

48

```

49

50

**Usage Examples:**

51

52

```scala

53

import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery

54

import akka.stream.scaladsl.Sink

55

56

// Implement the query trait

57

class MyReadJournal extends ReadJournal with EventsByPersistenceIdQuery {

58

def eventsByPersistenceId(

59

persistenceId: String,

60

fromSequenceNr: Long,

61

toSequenceNr: Long

62

): Source[EventEnvelope, NotUsed] = {

63

// Implementation specific logic

64

Source.empty

65

}

66

}

67

68

// Use the query

69

val readJournal: EventsByPersistenceIdQuery = getMyReadJournal()

70

71

// Query all events for a persistence ID (live stream)

72

readJournal

73

.eventsByPersistenceId("user-123", 0L, Long.MaxValue)

74

.runWith(Sink.foreach { envelope =>

75

println(s"Event: ${envelope.event} at sequence ${envelope.sequenceNr}")

76

})

77

78

// Query specific sequence number range

79

readJournal

80

.eventsByPersistenceId("order-456", 10L, 50L)

81

.runForeach { envelope =>

82

processEvent(envelope.event)

83

}

84

```

85

86

### Current Events by Persistence ID

87

88

Query current (finite) events for a specific persistent actor.

89

90

```scala { .api }

91

/**

92

* A plugin may optionally support this query by implementing this trait.

93

*/

94

trait CurrentEventsByPersistenceIdQuery extends ReadJournal {

95

/**

96

* Same type of query as EventsByPersistenceIdQuery#eventsByPersistenceId but the event stream

97

* is completed immediately when it reaches the end of the currently stored events.

98

*/

99

def currentEventsByPersistenceId(

100

persistenceId: String,

101

fromSequenceNr: Long,

102

toSequenceNr: Long

103

): Source[EventEnvelope, NotUsed]

104

}

105

```

106

107

**Usage Examples:**

108

109

```scala

110

import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery

111

112

// Query current events (finite stream)

113

val readJournal: CurrentEventsByPersistenceIdQuery = getMyReadJournal()

114

115

readJournal

116

.currentEventsByPersistenceId("user-123", 0L, Long.MaxValue)

117

.runForeach { envelope =>

118

println(s"Historical event: ${envelope.event}")

119

}

120

.onComplete {

121

case Success(_) => println("Finished processing historical events")

122

case Failure(ex) => println(s"Failed: $ex")

123

}

124

```

125

126

### Events by Tag

127

128

Query events that have a specific tag across all persistence IDs.

129

130

```scala { .api }

131

/**

132

* A plugin may optionally support this query by implementing this trait.

133

*/

134

trait EventsByTagQuery extends ReadJournal {

135

/**

136

* Query events that have a specific tag. A tag can for example correspond to an

137

* aggregate root type (in DDD terminology).

138

*

139

* The consumer can keep track of its current position in the event stream by storing the

140

* offset and restart the query from a given offset after a crash/restart.

141

*

142

* The returned event stream should be ordered by offset if possible, but this can also be

143

* difficult to fulfill for a distributed data store. The order must be documented by the

144

* read journal plugin.

145

*

146

* The stream is not completed when it reaches the end of the currently stored events,

147

* but it continues to push new events when new events are persisted.

148

*/

149

def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]

150

}

151

```

152

153

**Usage Examples:**

154

155

```scala

156

import akka.persistence.query.scaladsl.EventsByTagQuery

157

import akka.persistence.query.{NoOffset, Sequence}

158

159

val readJournal: EventsByTagQuery = getMyReadJournal()

160

161

// Query events by tag from beginning (live stream)

162

readJournal

163

.eventsByTag("user-events", NoOffset)

164

.runForeach { envelope =>

165

println(s"Tagged event: ${envelope.event} from ${envelope.persistenceId}")

166

}

167

168

// Resume from specific offset

169

val lastProcessedOffset = Sequence(1000L)

170

readJournal

171

.eventsByTag("order-events", lastProcessedOffset)

172

.runForeach { envelope =>

173

processTaggedEvent(envelope)

174

saveOffset(envelope.offset) // Save for resumption

175

}

176

```

177

178

### Current Events by Tag

179

180

Query current (finite) events that have a specific tag.

181

182

```scala { .api }

183

/**

184

* A plugin may optionally support this query by implementing this trait.

185

*/

186

trait CurrentEventsByTagQuery extends ReadJournal {

187

/**

188

* Same type of query as EventsByTagQuery#eventsByTag but the event stream

189

* is completed immediately when it reaches the end of the currently stored events.

190

*/

191

def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]

192

}

193

```

194

195

**Usage Examples:**

196

197

```scala

198

import akka.persistence.query.scaladsl.CurrentEventsByTagQuery

199

200

val readJournal: CurrentEventsByTagQuery = getMyReadJournal()

201

202

// Process all current events with tag (finite stream)

203

readJournal

204

.currentEventsByTag("batch-process", NoOffset)

205

.runForeach { envelope =>

206

processBatchEvent(envelope.event)

207

}

208

.onComplete {

209

case Success(_) => println("Batch processing complete")

210

case Failure(ex) => println(s"Batch processing failed: $ex")

211

}

212

```

213

214

### Persistence IDs Query

215

216

Query all persistence IDs in the journal.

217

218

```scala { .api }

219

/**

220

* A plugin may optionally support this query by implementing this trait.

221

*/

222

trait PersistenceIdsQuery extends ReadJournal {

223

/**

224

* Query all PersistentActor identifiers, i.e. as defined by the

225

* persistenceId of the PersistentActor.

226

*

227

* The stream is not completed when it reaches the end of the currently used persistenceIds,

228

* but it continues to push new persistenceIds when new persistent actors are created.

229

*/

230

def persistenceIds(): Source[String, NotUsed]

231

}

232

```

233

234

**Usage Examples:**

235

236

```scala

237

import akka.persistence.query.scaladsl.PersistenceIdsQuery

238

239

val readJournal: PersistenceIdsQuery = getMyReadJournal()

240

241

// Get all persistence IDs (live stream)

242

readJournal

243

.persistenceIds()

244

.runForeach { persistenceId =>

245

println(s"Found persistence ID: $persistenceId")

246

247

// Query events for each persistence ID

248

readJournal

249

.asInstanceOf[EventsByPersistenceIdQuery]

250

.eventsByPersistenceId(persistenceId, 0L, Long.MaxValue)

251

.take(10) // Limit to first 10 events

252

.runForeach { envelope =>

253

println(s" Event: ${envelope.event}")

254

}

255

}

256

```

257

258

### Current Persistence IDs Query

259

260

Query all current (finite) persistence IDs in the journal.

261

262

```scala { .api }

263

/**

264

* A plugin may optionally support this query by implementing this trait.

265

*/

266

trait CurrentPersistenceIdsQuery extends ReadJournal {

267

/**

268

* Same type of query as PersistenceIdsQuery#persistenceIds but the stream

269

* is completed immediately when it reaches the end of the currently used persistenceIds.

270

*/

271

def currentPersistenceIds(): Source[String, NotUsed]

272

}

273

```

274

275

**Usage Examples:**

276

277

```scala

278

import akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery

279

280

val readJournal: CurrentPersistenceIdsQuery = getMyReadJournal()

281

282

// Get all current persistence IDs (finite stream)

283

readJournal

284

.currentPersistenceIds()

285

.runForeach { persistenceId =>

286

println(s"Current persistence ID: $persistenceId")

287

}

288

.onComplete {

289

case Success(_) => println("Finished scanning persistence IDs")

290

case Failure(ex) => println(s"Scan failed: $ex")

291

}

292

```

293

294

### Paged Persistence IDs Query

295

296

Query persistence IDs with pagination support for large datasets.

297

298

```scala { .api }

299

/**

300

* A plugin ReadJournal may optionally support this query by implementing this trait.

301

*/

302

trait PagedPersistenceIdsQuery extends ReadJournal {

303

/**

304

* Get the current persistence ids.

305

*

306

* Not all plugins may support in database paging, and may simply use drop/take Akka streams operators

307

* to manipulate the result set according to the paging parameters.

308

*

309

* @param afterId The ID to start returning results from, or None to return all ids. This should be an id

310

* returned from a previous invocation of this command. Callers should not assume that ids are

311

* returned in sorted order.

312

* @param limit The maximum results to return. Use Long.MaxValue to return all results. Must be greater than zero.

313

* @return A source containing all the persistence ids, limited as specified.

314

*/

315

def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]

316

}

317

```

318

319

**Usage Examples:**

320

321

```scala

322

import akka.persistence.query.scaladsl.PagedPersistenceIdsQuery

323

324

val readJournal: PagedPersistenceIdsQuery = getMyReadJournal()

325

326

// Get first page of persistence IDs

327

readJournal

328

.currentPersistenceIds(None, 100)

329

.runFold(List.empty[String])(_ :+ _)

330

.map { firstPage =>

331

println(s"First page: ${firstPage.mkString(", ")}")

332

333

// Get next page starting from last ID

334

if (firstPage.nonEmpty) {

335

val lastId = firstPage.last

336

readJournal

337

.currentPersistenceIds(Some(lastId), 100)

338

.runForeach { nextId =>

339

println(s"Next page ID: $nextId")

340

}

341

}

342

}

343

344

// Get all persistence IDs with unlimited results

345

readJournal

346

.currentPersistenceIds(None, Long.MaxValue)

347

.runForeach { persistenceId =>

348

println(s"Persistence ID: $persistenceId")

349

}

350

```

351

352

## Types

353

354

```scala { .api }

355

import akka.NotUsed

356

import akka.stream.scaladsl.Source

357

358

trait Product4[T1, T2, T3, T4] {

359

def _1: T1

360

def _2: T2

361

def _3: T3

362

def _4: T4

363

}

364

365

case object NotUsed

366

```

367

368

## Query Composition

369

370

### Multiple Query Traits

371

372

Combine multiple query capabilities in a single read journal:

373

374

```scala

375

class ComprehensiveReadJournal extends ReadJournal

376

with EventsByPersistenceIdQuery

377

with CurrentEventsByPersistenceIdQuery

378

with EventsByTagQuery

379

with CurrentEventsByTagQuery

380

with PersistenceIdsQuery

381

with CurrentPersistenceIdsQuery {

382

383

// Implement all query methods

384

def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long) = ???

385

def currentEventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long) = ???

386

def eventsByTag(tag: String, offset: Offset) = ???

387

def currentEventsByTag(tag: String, offset: Offset) = ???

388

def persistenceIds() = ???

389

def currentPersistenceIds() = ???

390

}

391

```

392

393

### Capability Detection

394

395

Check if a read journal supports specific query capabilities:

396

397

```scala

398

def checkCapabilities(readJournal: ReadJournal): Unit = {

399

readJournal match {

400

case _: EventsByPersistenceIdQuery =>

401

println("Supports events by persistence ID queries")

402

case _ =>

403

println("Does not support events by persistence ID queries")

404

}

405

406

readJournal match {

407

case _: EventsByTagQuery =>

408

println("Supports events by tag queries")

409

case _ =>

410

println("Does not support events by tag queries")

411

}

412

413

readJournal match {

414

case _: PersistenceIdsQuery =>

415

println("Supports persistence IDs queries")

416

case _ =>

417

println("Does not support persistence IDs queries")

418

}

419

}

420

```

421

422

## Error Handling

423

424

### Query Failures

425

426

Handle failures in query streams:

427

428

```scala

429

readJournal

430

.eventsByTag("user-events", offset)

431

.recover {

432

case _: TimeoutException =>

433

println("Query timed out, restarting...")

434

EventEnvelope.empty // Placeholder

435

}

436

.runForeach { envelope =>

437

try {

438

processEvent(envelope.event)

439

} catch {

440

case ex: Exception =>

441

println(s"Failed to process event: $ex")

442

}

443

}

444

```

445

446

### Offset Validation

447

448

Validate offsets before using in queries:

449

450

```scala

451

def validateOffset(offset: Offset): Boolean = {

452

offset match {

453

case NoOffset => true

454

case Sequence(value) => value >= 0

455

case TimestampOffset(timestamp, _, _) => !timestamp.isBefore(Instant.EPOCH)

456

case _ => false

457

}

458

}

459

460

def safeEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {

461

if (validateOffset(offset)) {

462

readJournal.eventsByTag(tag, offset)

463

} else {

464

Source.failed(new IllegalArgumentException(s"Invalid offset: $offset"))

465

}

466

}

467

```

468

469

## Java API

470

471

All Scala query traits have corresponding Java API equivalents in the `javadsl` package:

472

473

```java

474

// Java API usage

475

import akka.persistence.query.PersistenceQuery;

476

import akka.persistence.query.javadsl.*;

477

478

// Get Java read journal

479

ReadJournal readJournal = PersistenceQuery.get(system)

480

.getReadJournalFor(MyJavaReadJournal.class, "my-journal");

481

482

// Check capabilities and use

483

if (readJournal instanceof EventsByPersistenceIdQuery) {

484

EventsByPersistenceIdQuery query = (EventsByPersistenceIdQuery) readJournal;

485

query.eventsByPersistenceId("user-123", 0L, Long.MAX_VALUE)

486

.runForeach(envelope -> {

487

System.out.println("Event: " + envelope.event());

488

}, system);

489

}

490

```

491

492

## Types

493

494

```scala { .api }

495

import akka.stream.scaladsl.Source

496

import akka.NotUsed

497

import scala.concurrent.Future

498

import scala.util.{Success, Failure}

499

500

trait TimeoutException extends Exception

501

```