Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-persistence-2-13@2.8.00
# Akka Persistence
1
2
Akka Persistence provides comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model. It enables actors to persist their internal state through event logging, allowing them to recover from failures by replaying events. The library supports multiple persistence backends through pluggable journal and snapshot store implementations, and includes specialized persistence patterns like persistent FSMs and persistent views.
3
4
## Package Information
5
6
- **Package Name**: com.typesafe.akka:akka-persistence_2.13
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**: `libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.8.8"`
10
11
## Core Imports
12
13
```scala
14
import akka.persistence._
15
import akka.persistence.journal._
16
import akka.persistence.snapshot._
17
import akka.actor.ActorSystem
18
import scala.concurrent.Future
19
```
20
21
For specific components:
22
23
```scala
24
import akka.persistence.{PersistentActor, AtLeastOnceDelivery}
25
import akka.persistence.fsm.PersistentFSM
26
import akka.persistence.state.scaladsl.DurableStateStore
27
import akka.persistence.journal.{AsyncWriteJournal, AsyncRecovery}
28
import akka.persistence.snapshot.SnapshotStore
29
import akka.Done
30
```
31
32
## Basic Usage
33
34
```scala
35
import akka.persistence._
36
import akka.actor.{ActorSystem, Props}
37
38
// Basic persistent actor
39
class BankAccount extends PersistentActor {
40
override def persistenceId: String = "bank-account-1"
41
42
var balance: Double = 0.0
43
44
override def receiveRecover: Receive = {
45
case evt: TransactionEvent => updateState(evt)
46
case SnapshotOffer(_, snapshot: Double) => balance = snapshot
47
}
48
49
override def receiveCommand: Receive = {
50
case Deposit(amount) =>
51
persist(Deposited(amount)) { evt =>
52
updateState(evt)
53
sender() ! s"Deposited $amount, balance: $balance"
54
}
55
case Withdraw(amount) if balance >= amount =>
56
persist(Withdrawn(amount)) { evt =>
57
updateState(evt)
58
sender() ! s"Withdrawn $amount, balance: $balance"
59
}
60
case GetBalance => sender() ! balance
61
}
62
63
def updateState(event: TransactionEvent): Unit = event match {
64
case Deposited(amount) => balance += amount
65
case Withdrawn(amount) => balance -= amount
66
}
67
}
68
69
// Usage
70
implicit val system = ActorSystem("bank-system")
71
val bankAccount = system.actorOf(Props[BankAccount], "bank-account")
72
bankAccount ! Deposit(100.0)
73
```
74
75
## Architecture
76
77
Akka Persistence is built around several key components:
78
79
- **Persistent Actors**: Event-sourced actors that persist state changes as events and can recover from failures
80
- **Journal**: Pluggable storage backend for events with support for multiple implementations (in-memory, file-based, database)
81
- **Snapshot Store**: Optional storage for actor state snapshots to optimize recovery performance
82
- **Event Adapters**: Transform events between domain and storage formats for schema evolution
83
- **Recovery System**: Configurable recovery process that replays events and restores snapshots during actor initialization
84
- **At-Least-Once Delivery**: Reliable message delivery with automatic redelivery and confirmation tracking
85
86
## Capabilities
87
88
### Persistent Actors
89
90
Core persistent actor functionality for event sourcing patterns with automatic state recovery and configurable persistence backends.
91
92
```scala { .api }
93
trait PersistentActor extends Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery
94
95
def persist[A](event: A)(handler: A => Unit): Unit
96
def persistAll[A](events: immutable.Seq[A])(handler: A => Unit): Unit
97
def persistAsync[A](event: A)(handler: A => Unit): Unit
98
def persistAllAsync[A](events: immutable.Seq[A])(handler: A => Unit): Unit
99
```
100
101
[Persistent Actors](./persistent-actors.md)
102
103
### Snapshot Management
104
105
State snapshot functionality for optimizing recovery performance and managing large event histories.
106
107
```scala { .api }
108
trait Snapshotter extends Actor {
109
def saveSnapshot(snapshot: Any): Unit
110
def deleteSnapshot(sequenceNr: Long): Unit
111
def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit
112
}
113
114
case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
115
case class SnapshotSelectionCriteria(
116
maxSequenceNr: Long = Long.MaxValue,
117
maxTimestamp: Long = Long.MaxValue,
118
minSequenceNr: Long = 0L,
119
minTimestamp: Long = 0L
120
)
121
```
122
123
[Snapshot Management](./snapshots.md)
124
125
### Event Adapters
126
127
Event transformation system for schema evolution and event format conversion between domain and journal representations.
128
129
```scala { .api }
130
trait EventAdapter extends WriteEventAdapter with ReadEventAdapter
131
132
trait WriteEventAdapter {
133
def manifest(event: Any): String
134
def toJournal(event: Any): Any
135
}
136
137
trait ReadEventAdapter {
138
def fromJournal(event: Any, manifest: String): EventSeq
139
}
140
```
141
142
[Event Adapters](./event-adapters.md)
143
144
### At-Least-Once Delivery
145
146
Reliable message delivery with automatic redelivery, confirmation tracking, and configurable retry policies.
147
148
```scala { .api }
149
trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {
150
def deliver(destination: ActorPath)(deliveryIdToMessage: Long => Any): Unit
151
def confirmDelivery(deliveryId: Long): Boolean
152
def numberOfUnconfirmed: Int
153
}
154
```
155
156
[At-Least-Once Delivery](./at-least-once-delivery.md)
157
158
### Durable State Management
159
160
Durable state storage for mutable state management with revision tracking and pluggable storage backends.
161
162
```scala { .api }
163
trait DurableStateStore[A] {
164
def getObject(persistenceId: String): Future[GetObjectResult[A]]
165
}
166
167
case class GetObjectResult[A](value: Option[A], revision: Long)
168
```
169
170
[Durable State](./durable-state.md)
171
172
### Journal API
173
174
Journal plugin development interfaces for implementing custom persistence backends with asynchronous write capabilities.
175
176
```scala { .api }
177
import scala.util.Try
178
179
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
180
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
181
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
182
}
183
184
trait AsyncRecovery {
185
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(recoveryCallback: PersistentRepr => Unit): Future[Unit]
186
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
187
}
188
```
189
190
[Journal API](./journal-api.md)
191
192
### Plugin Development
193
194
Comprehensive plugin development APIs for custom journal and snapshot store implementations with testing and deployment guides.
195
196
```scala { .api }
197
trait SnapshotStore extends Actor with ActorLogging {
198
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
199
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
200
def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
201
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
202
}
203
```
204
205
[Plugin Development](./plugin-development.md)
206
207
## Common Types
208
209
```scala { .api }
210
// Core persistence types
211
trait PersistentRepr extends Message {
212
/** The event payload */
213
def payload: Any
214
215
/** Event adapter manifest */
216
def manifest: String
217
218
/** Persistent actor ID */
219
def persistenceId: String
220
221
/** Sequence number */
222
def sequenceNr: Long
223
224
/** Storage timestamp */
225
def timestamp: Long
226
227
/** Optional metadata */
228
def metadata: Option[Any]
229
230
/** Writer unique identifier */
231
def writerUuid: String
232
233
/** Deletion flag (deprecated) */
234
def deleted: Boolean
235
236
/** Original sender (deprecated) */
237
def sender: ActorRef
238
239
/** Create new persistent repr with payload */
240
def withPayload(payload: Any): PersistentRepr
241
242
/** Create new persistent repr with manifest */
243
def withManifest(manifest: String): PersistentRepr
244
245
/** Create new persistent repr with timestamp */
246
def withTimestamp(newTimestamp: Long): PersistentRepr
247
248
/** Create new persistent repr with metadata */
249
def withMetadata(metadata: Any): PersistentRepr
250
251
/** Create updated copy */
252
def update(
253
sequenceNr: Long = sequenceNr,
254
persistenceId: String = persistenceId,
255
deleted: Boolean = deleted,
256
sender: ActorRef = sender,
257
writerUuid: String = writerUuid
258
): PersistentRepr
259
}
260
261
object PersistentRepr {
262
/** Plugin API: value of undefined persistenceId or manifest */
263
val Undefined = ""
264
265
/** Plugin API factory method */
266
def apply(
267
payload: Any,
268
sequenceNr: Long = 0L,
269
persistenceId: String = PersistentRepr.Undefined,
270
manifest: String = PersistentRepr.Undefined,
271
deleted: Boolean = false,
272
sender: ActorRef = null,
273
writerUuid: String = PersistentRepr.Undefined
274
): PersistentRepr
275
}
276
277
case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) {
278
/** Persistence ID from first message */
279
def persistenceId: String
280
281
/** Lowest sequence number in batch */
282
def lowestSequenceNr: Long
283
284
/** Highest sequence number in batch */
285
def highestSequenceNr: Long
286
287
/** Number of messages */
288
def size: Int
289
}
290
291
// Recovery configuration
292
case class Recovery(
293
fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
294
toSequenceNr: Long = Long.MaxValue,
295
replayMax: Long = Long.MaxValue
296
)
297
298
case object RecoveryCompleted
299
300
// Snapshot types
301
case class SnapshotMetadata(
302
persistenceId: String,
303
sequenceNr: Long,
304
timestamp: Long = 0L,
305
metadata: Option[Any] = None
306
)
307
308
// Journal response messages
309
case class DeleteMessagesSuccess(toSequenceNr: Long)
310
case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)
311
312
// Stash overflow strategies
313
sealed trait StashOverflowStrategy
314
case object DiscardToDeadLetterStrategy extends StashOverflowStrategy
315
case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy
316
case class ReplyToStrategy(response: Any) extends StashOverflowStrategy
317
```