or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

at-least-once-delivery.mddurable-state.mdevent-adapters.mdindex.mdjournal-api.mdpersistent-actors.mdplugin-development.mdsnapshots.md

journal-api.mddocs/

0

# Journal API

1

2

Journal plugin development interfaces for implementing custom persistence backends. These APIs enable developers to create journal plugins that store and retrieve persistent messages with various storage systems.

3

4

## Capabilities

5

6

### AsyncWriteJournal

7

8

Base trait for implementing asynchronous, non-blocking journal plugins.

9

10

```scala { .api }

11

/**

12

* Abstract journal optimized for asynchronous, non-blocking writes

13

*/

14

trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {

15

16

/**

17

* Plugin API: Asynchronously write messages to the journal.

18

* The returned future must be completed when all AtomicWrite operations are finished.

19

* The returned sequence must have the same size as the input sequence and must contain results

20

* for each AtomicWrite in the same order.

21

*/

22

def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]

23

24

/**

25

* Plugin API: Asynchronously delete messages up to the given sequence number.

26

* If `permanent` is false, messages are marked as deleted but not physically removed.

27

*/

28

def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]

29

}

30

```

31

32

**Usage Examples:**

33

34

```scala

35

import akka.persistence.journal.AsyncWriteJournal

36

import akka.persistence.{AtomicWrite, PersistentRepr}

37

import scala.concurrent.Future

38

import scala.util.{Try, Success, Failure}

39

40

class CustomJournal extends AsyncWriteJournal {

41

42

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {

43

// Custom storage implementation

44

Future.traverse(messages) { atomicWrite =>

45

val persistenceId = atomicWrite.persistenceId

46

val batch = atomicWrite.payload

47

48

// Store messages in your backend

49

storeMessages(persistenceId, batch).map(_ => Success(())).recover {

50

case ex => Failure(ex)

51

}

52

}

53

}

54

55

override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {

56

// Mark messages as deleted in your backend

57

markMessagesDeleted(persistenceId, toSequenceNr)

58

}

59

60

override def asyncReplayMessages(

61

persistenceId: String,

62

fromSequenceNr: Long,

63

toSequenceNr: Long,

64

max: Long

65

)(recoveryCallback: PersistentRepr => Unit): Future[Unit] = {

66

// Replay messages from your backend

67

replayFromStore(persistenceId, fromSequenceNr, toSequenceNr, max) { message =>

68

recoveryCallback(message)

69

}

70

}

71

72

override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {

73

// Return highest sequence number from your backend

74

getHighestSequenceNr(persistenceId)

75

}

76

77

// Custom implementation methods

78

private def storeMessages(persistenceId: String, messages: immutable.Seq[PersistentRepr]): Future[Unit] = ???

79

private def markMessagesDeleted(persistenceId: String, toSequenceNr: Long): Future[Unit] = ???

80

private def replayFromStore(persistenceId: String, from: Long, to: Long, max: Long)(callback: PersistentRepr => Unit): Future[Unit] = ???

81

private def getHighestSequenceNr(persistenceId: String): Future[Long] = ???

82

}

83

```

84

85

### AsyncRecovery

86

87

Interface for asynchronous message replay and sequence number recovery.

88

89

```scala { .api }

90

/**

91

* Asynchronous message replay and sequence number recovery interface

92

*/

93

trait AsyncRecovery {

94

95

/**

96

* Plugin API: Asynchronously replay persistent messages by calling replayCallback.

97

* Must complete when all messages matching sequence number bounds have been replayed.

98

*/

99

def asyncReplayMessages(

100

persistenceId: String,

101

fromSequenceNr: Long,

102

toSequenceNr: Long,

103

max: Long

104

)(recoveryCallback: PersistentRepr => Unit): Future[Unit]

105

106

/**

107

* Plugin API: Asynchronously read the highest stored sequence number.

108

* Used by persistent actors to determine starting point for new events.

109

*/

110

def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]

111

}

112

```

113

114

### WriteJournalBase

115

116

Base functionality for journal implementations including event adapter integration.

117

118

```scala { .api }

119

/**

120

* Base trait providing common journal functionality

121

*/

122

private[akka] trait WriteJournalBase {

123

this: Actor =>

124

125

/** Prepare batch of persistent messages for storage */

126

protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[AtomicWrite]

127

}

128

```

129

130

### SyncWriteJournal (Deprecated)

131

132

Synchronous journal interface - deprecated in favor of AsyncWriteJournal.

133

134

```scala { .api }

135

/**

136

* Synchronous write journal - DEPRECATED

137

* Use AsyncWriteJournal instead for better performance

138

*/

139

@deprecated("Use AsyncWriteJournal instead", "2.3.4")

140

trait SyncWriteJournal extends Actor with WriteJournalBase with SyncRecovery {

141

142

/** Synchronously write messages - blocks calling thread */

143

def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]]

144

145

/** Synchronously delete messages - blocks calling thread */

146

def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit

147

}

148

```

149

150

## Journal Message Types

151

152

### AtomicWrite

153

154

Container for a batch of persistent messages that must be written atomically.

155

156

```scala { .api }

157

/**

158

* Atomic write operation containing messages for single persistence ID

159

*/

160

case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) {

161

/** Persistence ID from first message */

162

def persistenceId: String = payload.head.persistenceId

163

164

/** Lowest sequence number in batch */

165

def lowestSequenceNr: Long = payload.head.sequenceNr

166

167

/** Highest sequence number in batch */

168

def highestSequenceNr: Long = payload.last.sequenceNr

169

170

/** Number of messages in batch */

171

def size: Int = payload.size

172

}

173

```

174

175

### WriteMessages

176

177

Journal protocol message for writing persistent messages.

178

179

```scala { .api }

180

/**

181

* Request to write messages to journal

182

*/

183

case class WriteMessages(

184

messages: immutable.Seq[PersistentEnvelope],

185

persistentActor: ActorRef,

186

actorInstanceId: Int

187

)

188

```

189

190

### ReplayMessages

191

192

Journal protocol message for replaying messages during recovery.

193

194

```scala { .api }

195

/**

196

* Request to replay messages from journal

197

*/

198

case class ReplayMessages(

199

fromSequenceNr: Long,

200

toSequenceNr: Long,

201

max: Long,

202

persistenceId: String,

203

persistentActor: ActorRef

204

)

205

```

206

207

## Error Handling

208

209

### Journal Failures

210

211

```scala { .api }

212

/** Exception indicating journal operation failure */

213

case class JournalFailureException(cause: Throwable) extends RuntimeException(cause)

214

215

/** Response indicating write message failure */

216

case class WriteMessageFailure(cause: Throwable, sequenceNr: Long)

217

218

/** Response indicating replay message failure */

219

case class ReplayMessagesFailure(cause: Throwable)

220

```

221

222

## Plugin Configuration

223

224

Journal plugins are configured in application.conf:

225

226

```hocon

227

akka.persistence.journal {

228

plugin = "custom-journal"

229

230

# Custom journal configuration

231

custom-journal {

232

class = "com.example.CustomJournal"

233

234

# Plugin-specific settings

235

connection-string = "jdbc:postgresql://localhost/events"

236

batch-size = 100

237

238

# Circuit breaker settings

239

circuit-breaker {

240

max-failures = 10

241

call-timeout = 10s

242

reset-timeout = 30s

243

}

244

245

# Replay filter settings

246

replay-filter {

247

mode = "repair-by-discard-old"

248

window-size = 100

249

max-old-writers = 10

250

debug = false

251

}

252

}

253

}

254

```

255

256

## Advanced Features

257

258

### Circuit Breaker Integration

259

260

AsyncWriteJournal includes built-in circuit breaker protection:

261

262

```scala

263

// Circuit breaker configuration

264

circuit-breaker {

265

max-failures = 10 # Number of failures before opening circuit

266

call-timeout = 10s # Timeout for journal operations

267

reset-timeout = 30s # Time before attempting to close circuit

268

}

269

```

270

271

### Replay Filter

272

273

Filters out corrupt or duplicate messages during replay:

274

275

```scala

276

// Replay filter modes

277

replay-filter {

278

mode = "repair-by-discard-old" # repair-by-discard-old, fail, warn, off

279

window-size = 100 # Size of duplicate detection window

280

max-old-writers = 10 # Maximum old writers to track

281

}

282

```

283

284

### Performance Optimization

285

286

```scala

287

class OptimizedJournal extends AsyncWriteJournal {

288

289

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {

290

// Batch multiple AtomicWrites for better throughput

291

val batches = messages.grouped(batchSize).toSeq

292

293

Future.traverse(batches) { batch =>

294

// Write batch to storage with single I/O operation

295

writeBatchToStorage(batch)

296

}.map(_.flatten)

297

}

298

299

// Use connection pooling and prepared statements

300

private def writeBatchToStorage(batch: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {

301

connectionPool.withConnection { connection =>

302

val preparedStatement = connection.prepareStatement(insertSQL)

303

// Batch insert for better performance

304

batch.foreach { atomicWrite =>

305

atomicWrite.payload.foreach { repr =>

306

preparedStatement.setString(1, repr.persistenceId)

307

preparedStatement.setLong(2, repr.sequenceNr)

308

preparedStatement.setBytes(3, serialize(repr))

309

preparedStatement.addBatch()

310

}

311

}

312

preparedStatement.executeBatch()

313

}

314

}

315

}

316

```

317

318

## Testing Journal Plugins

319

320

```scala

321

import akka.persistence.journal.JournalSpec

322

323

class CustomJournalSpec extends JournalSpec(config = ConfigFactory.parseString("""

324

akka.persistence.journal.plugin = "custom-journal"

325

custom-journal {

326

class = "com.example.CustomJournal"

327

# Test configuration

328

}

329

""")) {

330

331

"Custom journal" should {

332

"write and replay messages" in {

333

// Test cases provided by JournalSpec

334

}

335

336

"handle concurrent writes" in {

337

// Custom test cases

338

}

339

}

340

}

341

```

342

343

## Migration from Sync to Async

344

345

For migrating from SyncWriteJournal to AsyncWriteJournal:

346

347

```scala

348

// Old sync implementation

349

class OldSyncJournal extends SyncWriteJournal {

350

def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = {

351

// Blocking operations

352

messages.map(syncWrite)

353

}

354

}

355

356

// New async implementation

357

class NewAsyncJournal extends AsyncWriteJournal {

358

def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {

359

// Non-blocking operations

360

Future.traverse(messages)(asyncWrite)

361

}

362

}

363

```