or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md

journal-implementations.mddocs/

0

# Journal Implementations

1

2

Akka Persistence Query provides concrete read journal implementations for specific storage backends. These implementations demonstrate how to build read journals and provide working examples for common use cases.

3

4

## Capabilities

5

6

### LevelDB Read Journal (Deprecated)

7

8

Reference implementation using LevelDB storage backend. Note that this implementation is deprecated as of Akka 2.6.15.

9

10

```scala { .api }

11

/**

12

* LevelDB read journal implementation.

13

* @deprecated Use another journal implementation since 2.6.15

14

*/

15

class LeveldbReadJournal extends ReadJournal

16

with PersistenceIdsQuery

17

with CurrentPersistenceIdsQuery

18

with EventsByPersistenceIdQuery

19

with CurrentEventsByPersistenceIdQuery

20

with EventsByTagQuery

21

with CurrentEventsByTagQuery {

22

23

def persistenceIds(): Source[String, NotUsed]

24

def currentPersistenceIds(): Source[String, NotUsed]

25

26

def eventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Source[EventEnvelope, NotUsed]

27

def currentEventsByPersistenceId(persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Source[EventEnvelope, NotUsed]

28

29

def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]

30

def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]

31

}

32

```

33

34

### LevelDB Constants and Factory

35

36

Factory methods and constants for LevelDB read journal.

37

38

```scala { .api }

39

/**

40

* LevelDB read journal companion object with plugin identifier.

41

* @deprecated Use another journal implementation since 2.6.15

42

*/

43

object LeveldbReadJournal {

44

/** Plugin identifier for configuration */

45

val Identifier = "akka.persistence.query.journal.leveldb"

46

}

47

```

48

49

**Usage Examples:**

50

51

```scala

52

import akka.persistence.query.PersistenceQuery

53

import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal

54

55

// Note: LevelDB implementation is deprecated

56

val readJournal = PersistenceQuery(system)

57

.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)

58

59

// Query all persistence IDs

60

readJournal

61

.persistenceIds()

62

.take(10)

63

.runForeach { persistenceId =>

64

println(s"Found persistence ID: $persistenceId")

65

}

66

67

// Query events by persistence ID

68

readJournal

69

.eventsByPersistenceId("user-123", 0L, Long.MaxValue)

70

.runForeach { envelope =>

71

println(s"Event: ${envelope.event} at sequence ${envelope.sequenceNr}")

72

}

73

74

// Query events by tag

75

readJournal

76

.eventsByTag("user-events", NoOffset)

77

.runForeach { envelope =>

78

println(s"Tagged event: ${envelope.event} from ${envelope.persistenceId}")

79

}

80

81

// Query current (finite) events

82

readJournal

83

.currentEventsByTag("batch-events", NoOffset)

84

.runForeach { envelope =>

85

processBatchEvent(envelope.event)

86

}

87

.onComplete {

88

case Success(_) => println("Batch processing complete")

89

case Failure(ex) => println(s"Batch processing failed: $ex")

90

}

91

```

92

93

### LevelDB Read Journal Provider

94

95

Provider implementation for LevelDB read journal plugin.

96

97

```scala { .api }

98

/**

99

* LevelDB read journal provider implementation.

100

* @deprecated Use another journal implementation since 2.6.15

101

*/

102

class LeveldbReadJournalProvider extends ReadJournalProvider {

103

def scaladslReadJournal(): scaladsl.ReadJournal

104

def javadslReadJournal(): javadsl.ReadJournal

105

}

106

```

107

108

### Events by Slice Firehose Query

109

110

Advanced implementation providing slice-based querying with multiple query capabilities.

111

112

```scala { .api }

113

/**

114

* Firehose implementation for slice-based event queries.

115

* Provides high-performance event streaming with slice-based distribution.

116

*/

117

class EventsBySliceFirehoseQuery extends ReadJournal

118

with EventsBySliceQuery

119

with EventsBySliceStartingFromSnapshotsQuery

120

with EventTimestampQuery

121

with LoadEventQuery {

122

123

// EventsBySliceQuery methods

124

def eventsBySlices[Event](entityType: String, minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed]

125

def sliceForPersistenceId(persistenceId: String): Int

126

def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]

127

128

// EventsBySliceStartingFromSnapshotsQuery methods

129

def eventsBySlicesStartingFromSnapshots[Snapshot, Event](

130

entityType: String,

131

minSlice: Int,

132

maxSlice: Int,

133

offset: Offset,

134

transformSnapshot: Snapshot => Event

135

): Source[EventEnvelope[Event], NotUsed]

136

137

// EventTimestampQuery methods

138

def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]]

139

140

// LoadEventQuery methods

141

def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]]

142

}

143

```

144

145

### Firehose Constants and Factory

146

147

Constants and factory methods for the firehose implementation.

148

149

```scala { .api }

150

/**

151

* Firehose query companion object with plugin identifier.

152

*/

153

object EventsBySliceFirehoseQuery {

154

/** Plugin identifier for configuration */

155

val Identifier = "akka.persistence.query.events-by-slice-firehose"

156

}

157

```

158

159

**Usage Examples:**

160

161

```scala

162

import akka.persistence.query.PersistenceQuery

163

import akka.persistence.query.typed.scaladsl.EventsBySliceFirehoseQuery

164

165

// Get firehose read journal

166

val firehoseJournal = PersistenceQuery(system)

167

.readJournalFor[EventsBySliceFirehoseQuery](EventsBySliceFirehoseQuery.Identifier)

168

169

// Query events by slice range

170

firehoseJournal

171

.eventsBySlices[DomainEvent]("User", 0, 127, offset)

172

.runForeach { envelope =>

173

println(s"Event from slice ${envelope.slice}: ${envelope.event}")

174

println(s"Entity type: ${envelope.entityType}")

175

println(s"Tags: ${envelope.tags}")

176

}

177

178

// Distribute processing across slices

179

val sliceRanges = firehoseJournal.sliceRanges(8)

180

sliceRanges.zipWithIndex.foreach { case (range, processorId) =>

181

println(s"Starting processor $processorId for slices ${range.start}-${range.end}")

182

183

firehoseJournal

184

.eventsBySlices[DomainEvent]("Order", range.start, range.end, offset)

185

.runForeach { envelope =>

186

processInProcessor(processorId, envelope)

187

}

188

}

189

190

// Load specific events on demand

191

firehoseJournal

192

.loadEnvelope[UserEvent]("user-123", 42L)

193

.foreach { envelope =>

194

println(s"Loaded event: ${envelope.event}")

195

}

196

197

// Query event timestamps

198

firehoseJournal

199

.timestampOf("user-123", 42L)

200

.foreach {

201

case Some(timestamp) => println(s"Event timestamp: $timestamp")

202

case None => println("Event not found")

203

}

204

205

// Query with snapshot integration

206

firehoseJournal

207

.eventsBySlicesStartingFromSnapshots[UserSnapshot, UserEvent](

208

entityType = "User",

209

minSlice = 0,

210

maxSlice = 255,

211

offset = TimestampOffset.Zero,

212

transformSnapshot = snapshot => UserSnapshotRestored(snapshot.userId, snapshot.state)

213

)

214

.runForeach { envelope =>

215

envelope.event match {

216

case UserSnapshotRestored(userId, state) =>

217

println(s"Restored snapshot for $userId")

218

initializeUserState(userId, state)

219

case regularEvent =>

220

processUserEvent(regularEvent)

221

}

222

}

223

```

224

225

### Firehose Read Journal Provider

226

227

Provider implementation for the firehose read journal.

228

229

```scala { .api }

230

/**

231

* Provider for EventsBySliceFirehose query implementation.

232

*/

233

class EventsBySliceFirehoseReadJournalProvider extends ReadJournalProvider {

234

def scaladslReadJournal(): scaladsl.ReadJournal

235

def javadslReadJournal(): javadsl.ReadJournal

236

}

237

```

238

239

## Configuration

240

241

### LevelDB Configuration

242

243

Example configuration for LevelDB read journal (deprecated):

244

245

```hocon

246

# LevelDB read journal configuration (deprecated)

247

akka.persistence.query.journal.leveldb {

248

# Implementation class

249

class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"

250

251

# Reference to the write journal plugin

252

write-plugin = "akka.persistence.journal.leveldb"

253

254

# Directory where journal files are stored

255

dir = "target/journal"

256

257

# Maximum number of events to buffer

258

max-buffer-size = 100

259

260

# Refresh interval for live queries

261

refresh-interval = 1s

262

}

263

```

264

265

### Firehose Configuration

266

267

Example configuration for firehose implementation:

268

269

```hocon

270

# Firehose read journal configuration

271

akka.persistence.query.events-by-slice-firehose {

272

# Implementation class

273

class = "akka.persistence.query.typed.EventsBySliceFirehoseReadJournalProvider"

274

275

# Number of slices for horizontal partitioning

276

number-of-slices = 1024

277

278

# Batch size for event retrieval

279

batch-size = 1000

280

281

# Refresh interval for live queries

282

refresh-interval = 500ms

283

284

# Event loading configuration

285

event-loading {

286

# Timeout for loading individual events

287

timeout = 10s

288

289

# Parallelism for concurrent event loading

290

parallelism = 4

291

}

292

}

293

```

294

295

## Implementation Patterns

296

297

### Custom Read Journal Implementation

298

299

Example of implementing a custom read journal:

300

301

```scala

302

class CustomReadJournal(system: ExtendedActorSystem, config: Config)

303

extends ReadJournal

304

with EventsByPersistenceIdQuery

305

with EventsByTagQuery

306

with PersistenceIdsQuery {

307

308

private val backend = new CustomJournalBackend(config)

309

310

override def eventsByPersistenceId(

311

persistenceId: String,

312

fromSequenceNr: Long,

313

toSequenceNr: Long

314

): Source[EventEnvelope, NotUsed] = {

315

Source

316

.unfoldAsync(fromSequenceNr) { seqNr =>

317

if (seqNr > toSequenceNr) {

318

Future.successful(None)

319

} else {

320

backend.loadEvent(persistenceId, seqNr).map {

321

case Some(event) =>

322

val envelope = EventEnvelope(

323

offset = Sequence(seqNr),

324

persistenceId = persistenceId,

325

sequenceNr = seqNr,

326

event = event,

327

timestamp = System.currentTimeMillis()

328

)

329

Some((seqNr + 1, envelope))

330

case None =>

331

None

332

}

333

}

334

}

335

}

336

337

override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {

338

val startOffset = offset match {

339

case Sequence(value) => value

340

case NoOffset => 0L

341

case _ => throw new IllegalArgumentException(s"Unsupported offset type: $offset")

342

}

343

344

Source

345

.unfoldAsync(startOffset) { currentOffset =>

346

backend.loadEventsByTag(tag, currentOffset, batchSize = 100).map { events =>

347

if (events.nonEmpty) {

348

val lastOffset = events.last.sequenceNr

349

Some((lastOffset + 1, events))

350

} else {

351

// No more events, but this is a live query so we continue

352

Thread.sleep(1000) // Simple polling - real implementation would use more sophisticated approach

353

Some((currentOffset, List.empty))

354

}

355

}

356

}

357

.mapConcat(identity)

358

}

359

360

override def persistenceIds(): Source[String, NotUsed] = {

361

Source

362

.unfoldAsync(Option.empty[String]) { afterId =>

363

backend.loadPersistenceIds(afterId, limit = 100).map { ids =>

364

if (ids.nonEmpty) {

365

Some((ids.lastOption, ids))

366

} else {

367

// No more IDs, but this is a live query

368

Thread.sleep(1000)

369

Some((afterId, List.empty))

370

}

371

}

372

}

373

.mapConcat(identity)

374

}

375

}

376

377

// Custom provider

378

class CustomReadJournalProvider extends ReadJournalProvider {

379

override def scaladslReadJournal(): scaladsl.ReadJournal = {

380

new CustomReadJournal(system, config)

381

}

382

383

override def javadslReadJournal(): javadsl.ReadJournal = {

384

new CustomJavaReadJournal(scaladslReadJournal())

385

}

386

}

387

```

388

389

### Plugin Registration

390

391

Register custom journal implementation:

392

393

```hocon

394

# Custom read journal plugin

395

my-custom-journal {

396

class = "com.example.CustomReadJournalProvider"

397

398

# Custom configuration

399

connection-string = "jdbc:postgresql://localhost/mydb"

400

batch-size = 1000

401

refresh-interval = 1s

402

}

403

```

404

405

Usage:

406

407

```scala

408

val customJournal = PersistenceQuery(system)

409

.readJournalFor[CustomReadJournal]("my-custom-journal")

410

```

411

412

### Java API Implementation

413

414

Java wrapper for custom read journal:

415

416

```java

417

public class CustomJavaReadJournal implements javadsl.ReadJournal,

418

javadsl.EventsByPersistenceIdQuery,

419

javadsl.EventsByTagQuery {

420

421

private final CustomReadJournal scaladslJournal;

422

423

public CustomJavaReadJournal(CustomReadJournal scaladslJournal) {

424

this.scaladslJournal = scaladslJournal;

425

}

426

427

@Override

428

public Source<EventEnvelope, NotUsed> eventsByPersistenceId(

429

String persistenceId,

430

long fromSequenceNr,

431

long toSequenceNr) {

432

return scaladslJournal

433

.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)

434

.asJava();

435

}

436

437

@Override

438

public Source<EventEnvelope, NotUsed> eventsByTag(String tag, Offset offset) {

439

return scaladslJournal

440

.eventsByTag(tag, offset)

441

.asJava();

442

}

443

}

444

```

445

446

## Performance Considerations

447

448

### Batch Processing

449

450

Optimize query performance with batching:

451

452

```scala

453

// Batch event loading

454

def loadEventsBatch(persistenceIds: List[String]): Future[List[EventEnvelope]] = {

455

Source(persistenceIds)

456

.mapAsync(parallelism = 4) { persistenceId =>

457

readJournal

458

.currentEventsByPersistenceId(persistenceId, 0L, Long.MaxValue)

459

.take(100) // Limit per persistence ID

460

.runWith(Sink.seq)

461

}

462

.runWith(Sink.seq)

463

.map(_.flatten.toList)

464

}

465

466

// Batch tag queries

467

def loadTaggedEventsBatch(tags: List[String]): Future[List[EventEnvelope]] = {

468

Source(tags)

469

.mapAsync(parallelism = 2) { tag =>

470

readJournal

471

.currentEventsByTag(tag, NoOffset)

472

.take(1000) // Limit per tag

473

.runWith(Sink.seq)

474

}

475

.runWith(Sink.seq)

476

.map(_.flatten.toList)

477

}

478

```

479

480

### Slice Distribution

481

482

Optimize slice-based processing:

483

484

```scala

485

def optimizedSliceProcessing(totalSlices: Int = 1024, processors: Int = 8): Unit = {

486

val slicesPerProcessor = totalSlices / processors

487

488

(0 until processors).foreach { processorId =>

489

val minSlice = processorId * slicesPerProcessor

490

val maxSlice = if (processorId == processors - 1) {

491

totalSlices - 1 // Last processor handles remaining slices

492

} else {

493

(processorId + 1) * slicesPerProcessor - 1

494

}

495

496

println(s"Processor $processorId: slices $minSlice to $maxSlice")

497

498

firehoseJournal

499

.eventsBySlices[DomainEvent]("Entity", minSlice, maxSlice, offset)

500

.buffer(size = 1000, OverflowStrategy.backpressure)

501

.runForeach { envelope =>

502

processEventOptimized(processorId, envelope)

503

}

504

}

505

}

506

```

507

508

## Error Handling

509

510

### Journal Implementation Errors

511

512

Handle errors in custom journal implementations:

513

514

```scala

515

class ResilientCustomReadJournal extends CustomReadJournal {

516

517

override def eventsByPersistenceId(

518

persistenceId: String,

519

fromSequenceNr: Long,

520

toSequenceNr: Long

521

): Source[EventEnvelope, NotUsed] = {

522

523

super.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)

524

.recover {

525

case _: TimeoutException =>

526

println(s"Timeout loading events for $persistenceId, retrying...")

527

EventEnvelope.empty // Placeholder

528

case ex: Exception =>

529

println(s"Error loading events for $persistenceId: $ex")

530

throw ex

531

}

532

.filter(_ != EventEnvelope.empty) // Filter out placeholders

533

}

534

}

535

```

536

537

### Plugin Loading Failures

538

539

Handle plugin loading and configuration errors:

540

541

```scala

542

def safeLoadReadJournal[T <: ReadJournal](pluginId: String): Option[T] = {

543

try {

544

Some(PersistenceQuery(system).readJournalFor[T](pluginId))

545

} catch {

546

case _: ClassNotFoundException =>

547

println(s"Plugin class not found for $pluginId")

548

None

549

case _: ConfigurationException =>

550

println(s"Configuration error for plugin $pluginId")

551

None

552

case ex: Exception =>

553

println(s"Failed to load plugin $pluginId: $ex")

554

None

555

}

556

}

557

558

// Usage with fallback

559

val readJournal = safeLoadReadJournal[LeveldbReadJournal](LeveldbReadJournal.Identifier)

560

.getOrElse {

561

println("Falling back to in-memory journal")

562

getInMemoryReadJournal()

563

}

564

```

565

566

## Java API

567

568

Java API for journal implementations:

569

570

```java

571

import akka.persistence.query.PersistenceQuery;

572

import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;

573

import akka.persistence.query.typed.javadsl.EventsBySliceFirehoseQuery;

574

575

// LevelDB Java API (deprecated)

576

LeveldbReadJournal leveldbJournal = PersistenceQuery.get(system)

577

.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier);

578

579

// Firehose Java API

580

EventsBySliceFirehoseQuery firehoseJournal = PersistenceQuery.get(system)

581

.getReadJournalFor(EventsBySliceFirehoseQuery.class,

582

EventsBySliceFirehoseQuery.Identifier);

583

584

// Query events by slice

585

firehoseJournal

586

.eventsBySlices(DomainEvent.class, "User", 0, 127, offset)

587

.runForeach(envelope -> {

588

System.out.println("Event: " + envelope.getEvent());

589

System.out.println("Slice: " + envelope.slice());

590

}, system);

591

592

// Load specific event

593

firehoseJournal

594

.loadEnvelope(UserEvent.class, "user-123", 42L)

595

.thenAccept(envelope -> {

596

System.out.println("Loaded: " + envelope.getEvent());

597

});

598

```

599

600

## Types

601

602

```scala { .api }

603

import akka.stream.OverflowStrategy

604

import java.time.Instant

605

import scala.concurrent.Future

606

import akka.stream.scaladsl.{Source, Sink}

607

import akka.NotUsed

608

import com.typesafe.config.Config

609

import akka.actor.ExtendedActorSystem

610

611

case class DomainEvent(eventType: String, data: Map[String, Any])

612

case class UserEvent(userId: String, eventType: String, data: Map[String, Any])

613

case class UserSnapshot(userId: String, state: Map[String, Any])

614

case class UserSnapshotRestored(userId: String, state: Map[String, Any])

615

616

// Custom backend interface

617

trait CustomJournalBackend {

618

def loadEvent(persistenceId: String, sequenceNr: Long): Future[Option[Any]]

619

def loadEventsByTag(tag: String, offset: Long, batchSize: Int): Future[List[EventEnvelope]]

620

def loadPersistenceIds(afterId: Option[String], limit: Long): Future[List[String]]

621

}

622

```