or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

at-least-once-delivery.mddurable-state.mdevent-adapters.mdindex.mdjournal-api.mdpersistent-actors.mdplugin-development.mdsnapshots.md

plugin-development.mddocs/

0

# Plugin Development

1

2

Comprehensive guide for developing custom journal and snapshot store plugins for Akka Persistence. This documentation covers the plugin API, implementation patterns, and configuration requirements.

3

4

## Capabilities

5

6

### Plugin Provider Interfaces

7

8

#### PersistencePlugin

9

10

Base interface for all persistence plugins.

11

12

```scala { .api }

13

/**

14

* Base interface for persistence plugins

15

*/

16

trait PersistencePlugin {

17

/** Unique plugin identifier */

18

def pluginId: String

19

}

20

```

21

22

#### PersistencePluginProxy

23

24

Proxy for dynamically selecting persistence plugins.

25

26

```scala { .api }

27

/**

28

* Proxy that delegates to target plugin based on configuration

29

*/

30

class PersistencePluginProxy(targetPluginConfig: Config) extends Actor {

31

/** Target plugin actor reference */

32

def targetPlugin: ActorRef

33

34

/** Start target plugin with given configuration */

35

def startTargetPlugin(): Unit

36

}

37

```

38

39

### Journal Plugin Development

40

41

#### AsyncWriteJournal Implementation

42

43

Complete implementation template for async journal plugins.

44

45

```scala { .api }

46

/**

47

* Base trait for implementing custom journal plugins

48

*/

49

trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {

50

51

/** Plugin API: Write messages asynchronously */

52

def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]

53

54

/** Plugin API: Delete messages asynchronously */

55

def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]

56

57

/** Plugin API: Replay messages for recovery */

58

def asyncReplayMessages(

59

persistenceId: String,

60

fromSequenceNr: Long,

61

toSequenceNr: Long,

62

max: Long

63

)(recoveryCallback: PersistentRepr => Unit): Future[Unit]

64

65

/** Plugin API: Read highest sequence number */

66

def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]

67

}

68

```

69

70

**Implementation Example:**

71

72

```scala

73

import akka.persistence.journal.AsyncWriteJournal

74

import akka.persistence.{AtomicWrite, PersistentRepr}

75

import java.sql.{Connection, PreparedStatement}

76

import javax.sql.DataSource

77

78

class DatabaseJournal extends AsyncWriteJournal {

79

80

// Configuration and connection management

81

private val dataSource: DataSource = createDataSource()

82

private val batchSize: Int = config.getInt("batch-size")

83

84

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {

85

Future {

86

withConnection { connection =>

87

val statement = connection.prepareStatement(

88

"INSERT INTO journal (persistence_id, sequence_nr, payload, manifest) VALUES (?, ?, ?, ?)"

89

)

90

91

messages.map { atomicWrite =>

92

Try {

93

atomicWrite.payload.foreach { repr =>

94

statement.setString(1, repr.persistenceId)

95

statement.setLong(2, repr.sequenceNr)

96

statement.setBytes(3, serialize(repr.payload))

97

statement.setString(4, repr.manifest)

98

statement.addBatch()

99

}

100

statement.executeBatch()

101

statement.clearBatch()

102

}

103

}

104

}

105

}

106

}

107

108

override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {

109

Future {

110

withConnection { connection =>

111

val statement = connection.prepareStatement(

112

"UPDATE journal SET deleted = true WHERE persistence_id = ? AND sequence_nr <= ?"

113

)

114

statement.setString(1, persistenceId)

115

statement.setLong(2, toSequenceNr)

116

statement.executeUpdate()

117

}

118

}

119

}

120

121

override def asyncReplayMessages(

122

persistenceId: String,

123

fromSequenceNr: Long,

124

toSequenceNr: Long,

125

max: Long

126

)(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {

127

Future {

128

withConnection { connection =>

129

val statement = connection.prepareStatement(

130

"""SELECT sequence_nr, payload, manifest FROM journal

131

WHERE persistence_id = ? AND sequence_nr >= ? AND sequence_nr <= ?

132

ORDER BY sequence_nr LIMIT ?"""

133

)

134

statement.setString(1, persistenceId)

135

statement.setLong(2, fromSequenceNr)

136

statement.setLong(3, toSequenceNr)

137

statement.setLong(4, max)

138

139

val resultSet = statement.executeQuery()

140

while (resultSet.next()) {

141

val sequenceNr = resultSet.getLong("sequence_nr")

142

val payload = deserialize(resultSet.getBytes("payload"))

143

val manifest = resultSet.getString("manifest")

144

145

val repr = PersistentRepr(

146

payload = payload,

147

sequenceNr = sequenceNr,

148

persistenceId = persistenceId,

149

manifest = manifest

150

)

151

recoveryCallback(repr)

152

}

153

}

154

}

155

}

156

157

override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {

158

Future {

159

withConnection { connection =>

160

val statement = connection.prepareStatement(

161

"SELECT MAX(sequence_nr) FROM journal WHERE persistence_id = ?"

162

)

163

statement.setString(1, persistenceId)

164

val resultSet = statement.executeQuery()

165

if (resultSet.next()) resultSet.getLong(1) else 0L

166

}

167

}

168

}

169

170

// Helper methods

171

private def withConnection[T](f: Connection => T): T = {

172

val connection = dataSource.getConnection()

173

try f(connection)

174

finally connection.close()

175

}

176

177

private def serialize(obj: Any): Array[Byte] = ??? // Implement serialization

178

private def deserialize(bytes: Array[Byte]): Any = ??? // Implement deserialization

179

private def createDataSource(): DataSource = ??? // Create database connection pool

180

}

181

```

182

183

### Snapshot Store Plugin Development

184

185

#### SnapshotStore Implementation

186

187

Base trait for implementing snapshot store plugins.

188

189

```scala { .api }

190

/**

191

* Base trait for snapshot store plugins

192

*/

193

trait SnapshotStore extends Actor with ActorLogging {

194

195

/** Plugin API: Load snapshot matching criteria */

196

def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]

197

198

/** Plugin API: Save snapshot with metadata */

199

def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]

200

201

/** Plugin API: Delete specific snapshot */

202

def deleteAsync(metadata: SnapshotMetadata): Future[Unit]

203

204

/** Plugin API: Delete snapshots matching criteria */

205

def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]

206

207

/** Plugin API: Handle additional plugin-specific messages */

208

def receivePluginInternal: Actor.Receive = Actor.emptyBehavior

209

}

210

```

211

212

**Implementation Example:**

213

214

```scala

215

import akka.persistence.snapshot.SnapshotStore

216

import akka.persistence.{SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria}

217

218

class DatabaseSnapshotStore extends SnapshotStore {

219

220

override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {

221

Future {

222

withConnection { connection =>

223

val statement = connection.prepareStatement(

224

"""SELECT sequence_nr, timestamp, snapshot_data FROM snapshot_store

225

WHERE persistence_id = ? AND sequence_nr <= ? AND timestamp <= ?

226

ORDER BY sequence_nr DESC, timestamp DESC LIMIT 1"""

227

)

228

statement.setString(1, persistenceId)

229

statement.setLong(2, criteria.maxSequenceNr)

230

statement.setLong(3, criteria.maxTimestamp)

231

232

val resultSet = statement.executeQuery()

233

if (resultSet.next()) {

234

val sequenceNr = resultSet.getLong("sequence_nr")

235

val timestamp = resultSet.getLong("timestamp")

236

val snapshotData = deserialize(resultSet.getBytes("snapshot_data"))

237

238

val metadata = SnapshotMetadata(persistenceId, sequenceNr, timestamp)

239

Some(SelectedSnapshot(metadata, snapshotData))

240

} else {

241

None

242

}

243

}

244

}

245

}

246

247

override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {

248

Future {

249

withConnection { connection =>

250

val statement = connection.prepareStatement(

251

"INSERT INTO snapshot_store (persistence_id, sequence_nr, timestamp, snapshot_data) VALUES (?, ?, ?, ?)"

252

)

253

statement.setString(1, metadata.persistenceId)

254

statement.setLong(2, metadata.sequenceNr)

255

statement.setLong(3, metadata.timestamp)

256

statement.setBytes(4, serialize(snapshot))

257

statement.executeUpdate()

258

}

259

}

260

}

261

262

override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {

263

Future {

264

withConnection { connection =>

265

val statement = connection.prepareStatement(

266

"DELETE FROM snapshot_store WHERE persistence_id = ? AND sequence_nr = ? AND timestamp = ?"

267

)

268

statement.setString(1, metadata.persistenceId)

269

statement.setLong(2, metadata.sequenceNr)

270

statement.setLong(3, metadata.timestamp)

271

statement.executeUpdate()

272

}

273

}

274

}

275

276

override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {

277

Future {

278

withConnection { connection =>

279

val statement = connection.prepareStatement(

280

"""DELETE FROM snapshot_store

281

WHERE persistence_id = ? AND sequence_nr <= ? AND timestamp <= ?"""

282

)

283

statement.setString(1, persistenceId)

284

statement.setLong(2, criteria.maxSequenceNr)

285

statement.setLong(3, criteria.maxTimestamp)

286

statement.executeUpdate()

287

}

288

}

289

}

290

}

291

```

292

293

### Java API Plugin Development

294

295

#### Java Snapshot Store

296

297

```scala { .api }

298

/**

299

* Java API for snapshot store plugins

300

*/

301

abstract class SnapshotStore extends akka.persistence.snapshot.SnapshotStore {

302

303

/** Java API: Load snapshot */

304

def doLoadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): CompletionStage[Optional[SelectedSnapshot]]

305

306

/** Java API: Save snapshot */

307

def doSaveAsync(metadata: SnapshotMetadata, snapshot: Any): CompletionStage[Void]

308

309

/** Java API: Delete snapshot */

310

def doDeleteAsync(metadata: SnapshotMetadata): CompletionStage[Void]

311

312

/** Java API: Delete snapshots */

313

def doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): CompletionStage[Void]

314

}

315

```

316

317

## Plugin Configuration

318

319

### Journal Plugin Configuration

320

321

```hocon

322

# Custom journal plugin configuration

323

my-journal {

324

# Plugin implementation class

325

class = "com.example.DatabaseJournal"

326

327

# Plugin-specific settings

328

connection-string = "jdbc:postgresql://localhost/akka_journal"

329

username = "akka"

330

password = "akka"

331

batch-size = 100

332

333

# Circuit breaker configuration

334

circuit-breaker {

335

max-failures = 10

336

call-timeout = 10s

337

reset-timeout = 30s

338

}

339

340

# Replay filter configuration

341

replay-filter {

342

mode = "repair-by-discard-old"

343

window-size = 100

344

max-old-writers = 10

345

debug = false

346

}

347

348

# Connection pool settings

349

connection-pool {

350

initial-size = 5

351

max-size = 20

352

connection-timeout = 10s

353

}

354

}

355

356

# Use the custom journal

357

akka.persistence.journal.plugin = "my-journal"

358

```

359

360

### Snapshot Store Configuration

361

362

```hocon

363

# Custom snapshot store plugin configuration

364

my-snapshot-store {

365

class = "com.example.DatabaseSnapshotStore"

366

367

# Plugin-specific settings

368

connection-string = "jdbc:postgresql://localhost/akka_snapshots"

369

username = "akka"

370

password = "akka"

371

372

# Circuit breaker configuration

373

circuit-breaker {

374

max-failures = 5

375

call-timeout = 10s

376

reset-timeout = 30s

377

}

378

}

379

380

# Use the custom snapshot store

381

akka.persistence.snapshot-store.plugin = "my-snapshot-store"

382

```

383

384

## Advanced Plugin Features

385

386

### Event Adapter Integration

387

388

```scala

389

class CustomJournal extends AsyncWriteJournal {

390

391

// Event adapters are automatically applied via WriteJournalBase

392

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {

393

// Messages are already adapted by preparePersistentBatch

394

val adaptedMessages = preparePersistentBatch(messages)

395

writeToBackend(adaptedMessages)

396

}

397

}

398

```

399

400

### Plugin Metrics and Monitoring

401

402

```scala

403

class MonitoredJournal extends AsyncWriteJournal {

404

private val writeLatencyHistogram = registry.histogram("journal.write.latency")

405

private val writeCounter = registry.counter("journal.write.count")

406

private val errorCounter = registry.counter("journal.write.errors")

407

408

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {

409

val startTime = System.nanoTime()

410

411

writeToBackend(messages).andThen {

412

case Success(_) =>

413

writeLatencyHistogram.update(System.nanoTime() - startTime)

414

writeCounter.inc(messages.map(_.size).sum)

415

case Failure(_) =>

416

errorCounter.inc()

417

}

418

}

419

}

420

```

421

422

### Plugin Migration Support

423

424

```scala

425

class MigrationJournal extends AsyncWriteJournal {

426

private val oldJournal: ActorRef = context.actorOf(Props[OldJournalPlugin])

427

private val newJournal: ActorRef = context.actorOf(Props[NewJournalPlugin])

428

429

override def asyncReplayMessages(

430

persistenceId: String,

431

fromSequenceNr: Long,

432

toSequenceNr: Long,

433

max: Long

434

)(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {

435

// Try new storage first, fallback to old storage

436

newJournal.ask(ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId)).flatMap {

437

case messages if messages.nonEmpty => Future.successful(())

438

case _ => oldJournal.ask(ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId))

439

}

440

}

441

}

442

```

443

444

## Testing Plugin Implementations

445

446

### Journal Plugin Testing

447

448

```scala

449

import akka.persistence.journal.JournalSpec

450

451

class CustomJournalSpec extends JournalSpec(

452

config = ConfigFactory.parseString(

453

"""

454

akka.persistence.journal.plugin = "custom-journal"

455

custom-journal {

456

class = "com.example.CustomJournal"

457

# Test configuration

458

}

459

"""

460

)

461

) {

462

"Custom journal" should {

463

"pass all journal compliance tests" in {

464

// Tests provided by JournalSpec

465

}

466

}

467

}

468

```

469

470

### Snapshot Store Testing

471

472

```scala

473

import akka.persistence.snapshot.SnapshotStoreSpec

474

475

class CustomSnapshotStoreSpec extends SnapshotStoreSpec(

476

config = ConfigFactory.parseString(

477

"""

478

akka.persistence.snapshot-store.plugin = "custom-snapshot-store"

479

custom-snapshot-store {

480

class = "com.example.CustomSnapshotStore"

481

# Test configuration

482

}

483

"""

484

)

485

) {

486

"Custom snapshot store" should {

487

"pass all snapshot store compliance tests" in {

488

// Tests provided by SnapshotStoreSpec

489

}

490

}

491

}

492

```

493

494

## Plugin Deployment

495

496

### Packaging as SBT Plugin

497

498

```scala

499

// build.sbt

500

ThisBuild / organization := "com.example"

501

ThisBuild / scalaVersion := "2.13.10"

502

503

lazy val akkaVersion = "2.8.8"

504

505

libraryDependencies ++= Seq(

506

"com.typesafe.akka" %% "akka-persistence" % akkaVersion,

507

"com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test

508

)

509

510

// Make plugin discoverable

511

resourceDirectories in Compile += (sourceDirectory in Compile).value / "resources"

512

```

513

514

### Reference Configuration

515

516

```hocon

517

# reference.conf - default plugin configuration

518

my-persistence-plugin {

519

journal {

520

class = "com.example.MyJournal"

521

# Default settings

522

}

523

524

snapshot-store {

525

class = "com.example.MySnapshotStore"

526

# Default settings

527

}

528

}

529

```

530

531

## Performance Optimization

532

533

### Batching Strategies

534

535

```scala

536

class BatchingJournal extends AsyncWriteJournal {

537

private val batcher = new MessageBatcher(batchSize = 100, flushInterval = 50.millis)

538

539

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {

540

// Batch multiple writes for better throughput

541

batcher.addBatch(messages).map { batches =>

542

batches.flatMap(writeBatchToStorage)

543

}

544

}

545

}

546

```

547

548

### Connection Management

549

550

```scala

551

class PooledJournal extends AsyncWriteJournal {

552

private val connectionPool = new HikariConnectionPool(config)

553

554

override def postStop(): Unit = {

555

connectionPool.close()

556

super.postStop()

557

}

558

559

private def withConnection[T](f: Connection => T): Future[T] = {

560

Future {

561

val connection = connectionPool.getConnection()

562

try f(connection)

563

finally connection.close()

564

}

565

}

566

}

567

```