or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-com-typesafe-akka--akka-persistence-2-13

Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-persistence_2.13@2.8.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-persistence-2-13@2.8.0

0

# Akka Persistence

1

2

Akka Persistence provides comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model. It enables actors to persist their internal state through event logging, allowing them to recover from failures by replaying events. The library supports multiple persistence backends through pluggable journal and snapshot store implementations, and includes specialized persistence patterns like persistent FSMs and persistent views.

3

4

## Package Information

5

6

- **Package Name**: com.typesafe.akka:akka-persistence_2.13

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Installation**: `libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.8.8"`

10

11

## Core Imports

12

13

```scala

14

import akka.persistence._

15

import akka.persistence.journal._

16

import akka.persistence.snapshot._

17

import akka.actor.ActorSystem

18

import scala.concurrent.Future

19

```

20

21

For specific components:

22

23

```scala

24

import akka.persistence.{PersistentActor, AtLeastOnceDelivery}

25

import akka.persistence.fsm.PersistentFSM

26

import akka.persistence.state.scaladsl.DurableStateStore

27

import akka.persistence.journal.{AsyncWriteJournal, AsyncRecovery}

28

import akka.persistence.snapshot.SnapshotStore

29

import akka.Done

30

```

31

32

## Basic Usage

33

34

```scala

35

import akka.persistence._

36

import akka.actor.{ActorSystem, Props}

37

38

// Basic persistent actor

39

class BankAccount extends PersistentActor {

40

override def persistenceId: String = "bank-account-1"

41

42

var balance: Double = 0.0

43

44

override def receiveRecover: Receive = {

45

case evt: TransactionEvent => updateState(evt)

46

case SnapshotOffer(_, snapshot: Double) => balance = snapshot

47

}

48

49

override def receiveCommand: Receive = {

50

case Deposit(amount) =>

51

persist(Deposited(amount)) { evt =>

52

updateState(evt)

53

sender() ! s"Deposited $amount, balance: $balance"

54

}

55

case Withdraw(amount) if balance >= amount =>

56

persist(Withdrawn(amount)) { evt =>

57

updateState(evt)

58

sender() ! s"Withdrawn $amount, balance: $balance"

59

}

60

case GetBalance => sender() ! balance

61

}

62

63

def updateState(event: TransactionEvent): Unit = event match {

64

case Deposited(amount) => balance += amount

65

case Withdrawn(amount) => balance -= amount

66

}

67

}

68

69

// Usage

70

implicit val system = ActorSystem("bank-system")

71

val bankAccount = system.actorOf(Props[BankAccount], "bank-account")

72

bankAccount ! Deposit(100.0)

73

```

74

75

## Architecture

76

77

Akka Persistence is built around several key components:

78

79

- **Persistent Actors**: Event-sourced actors that persist state changes as events and can recover from failures

80

- **Journal**: Pluggable storage backend for events with support for multiple implementations (in-memory, file-based, database)

81

- **Snapshot Store**: Optional storage for actor state snapshots to optimize recovery performance

82

- **Event Adapters**: Transform events between domain and storage formats for schema evolution

83

- **Recovery System**: Configurable recovery process that replays events and restores snapshots during actor initialization

84

- **At-Least-Once Delivery**: Reliable message delivery with automatic redelivery and confirmation tracking

85

86

## Capabilities

87

88

### Persistent Actors

89

90

Core persistent actor functionality for event sourcing patterns with automatic state recovery and configurable persistence backends.

91

92

```scala { .api }

93

trait PersistentActor extends Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery

94

95

def persist[A](event: A)(handler: A => Unit): Unit

96

def persistAll[A](events: immutable.Seq[A])(handler: A => Unit): Unit

97

def persistAsync[A](event: A)(handler: A => Unit): Unit

98

def persistAllAsync[A](events: immutable.Seq[A])(handler: A => Unit): Unit

99

```

100

101

[Persistent Actors](./persistent-actors.md)

102

103

### Snapshot Management

104

105

State snapshot functionality for optimizing recovery performance and managing large event histories.

106

107

```scala { .api }

108

trait Snapshotter extends Actor {

109

def saveSnapshot(snapshot: Any): Unit

110

def deleteSnapshot(sequenceNr: Long): Unit

111

def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit

112

}

113

114

case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)

115

case class SnapshotSelectionCriteria(

116

maxSequenceNr: Long = Long.MaxValue,

117

maxTimestamp: Long = Long.MaxValue,

118

minSequenceNr: Long = 0L,

119

minTimestamp: Long = 0L

120

)

121

```

122

123

[Snapshot Management](./snapshots.md)

124

125

### Event Adapters

126

127

Event transformation system for schema evolution and event format conversion between domain and journal representations.

128

129

```scala { .api }

130

trait EventAdapter extends WriteEventAdapter with ReadEventAdapter

131

132

trait WriteEventAdapter {

133

def manifest(event: Any): String

134

def toJournal(event: Any): Any

135

}

136

137

trait ReadEventAdapter {

138

def fromJournal(event: Any, manifest: String): EventSeq

139

}

140

```

141

142

[Event Adapters](./event-adapters.md)

143

144

### At-Least-Once Delivery

145

146

Reliable message delivery with automatic redelivery, confirmation tracking, and configurable retry policies.

147

148

```scala { .api }

149

trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {

150

def deliver(destination: ActorPath)(deliveryIdToMessage: Long => Any): Unit

151

def confirmDelivery(deliveryId: Long): Boolean

152

def numberOfUnconfirmed: Int

153

}

154

```

155

156

[At-Least-Once Delivery](./at-least-once-delivery.md)

157

158

### Durable State Management

159

160

Durable state storage for mutable state management with revision tracking and pluggable storage backends.

161

162

```scala { .api }

163

trait DurableStateStore[A] {

164

def getObject(persistenceId: String): Future[GetObjectResult[A]]

165

}

166

167

case class GetObjectResult[A](value: Option[A], revision: Long)

168

```

169

170

[Durable State](./durable-state.md)

171

172

### Journal API

173

174

Journal plugin development interfaces for implementing custom persistence backends with asynchronous write capabilities.

175

176

```scala { .api }

177

import scala.util.Try

178

179

trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {

180

def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]

181

def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]

182

}

183

184

trait AsyncRecovery {

185

def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(recoveryCallback: PersistentRepr => Unit): Future[Unit]

186

def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]

187

}

188

```

189

190

[Journal API](./journal-api.md)

191

192

### Plugin Development

193

194

Comprehensive plugin development APIs for custom journal and snapshot store implementations with testing and deployment guides.

195

196

```scala { .api }

197

trait SnapshotStore extends Actor with ActorLogging {

198

def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]

199

def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]

200

def deleteAsync(metadata: SnapshotMetadata): Future[Unit]

201

def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]

202

}

203

```

204

205

[Plugin Development](./plugin-development.md)

206

207

## Common Types

208

209

```scala { .api }

210

// Core persistence types

211

trait PersistentRepr extends Message {

212

/** The event payload */

213

def payload: Any

214

215

/** Event adapter manifest */

216

def manifest: String

217

218

/** Persistent actor ID */

219

def persistenceId: String

220

221

/** Sequence number */

222

def sequenceNr: Long

223

224

/** Storage timestamp */

225

def timestamp: Long

226

227

/** Optional metadata */

228

def metadata: Option[Any]

229

230

/** Writer unique identifier */

231

def writerUuid: String

232

233

/** Deletion flag (deprecated) */

234

def deleted: Boolean

235

236

/** Original sender (deprecated) */

237

def sender: ActorRef

238

239

/** Create new persistent repr with payload */

240

def withPayload(payload: Any): PersistentRepr

241

242

/** Create new persistent repr with manifest */

243

def withManifest(manifest: String): PersistentRepr

244

245

/** Create new persistent repr with timestamp */

246

def withTimestamp(newTimestamp: Long): PersistentRepr

247

248

/** Create new persistent repr with metadata */

249

def withMetadata(metadata: Any): PersistentRepr

250

251

/** Create updated copy */

252

def update(

253

sequenceNr: Long = sequenceNr,

254

persistenceId: String = persistenceId,

255

deleted: Boolean = deleted,

256

sender: ActorRef = sender,

257

writerUuid: String = writerUuid

258

): PersistentRepr

259

}

260

261

object PersistentRepr {

262

/** Plugin API: value of undefined persistenceId or manifest */

263

val Undefined = ""

264

265

/** Plugin API factory method */

266

def apply(

267

payload: Any,

268

sequenceNr: Long = 0L,

269

persistenceId: String = PersistentRepr.Undefined,

270

manifest: String = PersistentRepr.Undefined,

271

deleted: Boolean = false,

272

sender: ActorRef = null,

273

writerUuid: String = PersistentRepr.Undefined

274

): PersistentRepr

275

}

276

277

case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) {

278

/** Persistence ID from first message */

279

def persistenceId: String

280

281

/** Lowest sequence number in batch */

282

def lowestSequenceNr: Long

283

284

/** Highest sequence number in batch */

285

def highestSequenceNr: Long

286

287

/** Number of messages */

288

def size: Int

289

}

290

291

// Recovery configuration

292

case class Recovery(

293

fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,

294

toSequenceNr: Long = Long.MaxValue,

295

replayMax: Long = Long.MaxValue

296

)

297

298

case object RecoveryCompleted

299

300

// Snapshot types

301

case class SnapshotMetadata(

302

persistenceId: String,

303

sequenceNr: Long,

304

timestamp: Long = 0L,

305

metadata: Option[Any] = None

306

)

307

308

// Journal response messages

309

case class DeleteMessagesSuccess(toSequenceNr: Long)

310

case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)

311

312

// Stash overflow strategies

313

sealed trait StashOverflowStrategy

314

case object DiscardToDeadLetterStrategy extends StashOverflowStrategy

315

case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy

316

case class ReplyToStrategy(response: Any) extends StashOverflowStrategy

317

```