0
# Persistent Actors
1
2
Core persistent actor functionality for event sourcing patterns with automatic state recovery and configurable persistence backends.
3
4
## Capabilities
5
6
### PersistentActor Trait
7
8
The main trait for implementing persistent actors using event sourcing patterns.
9
10
```scala { .api }
11
/**
12
* Scala API for persistent actors that can be used to implement command or Event Sourcing patterns
13
*/
14
trait PersistentActor extends Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery {
15
16
/** Asynchronously persists an event with stashing */
17
def persist[A](event: A)(handler: A => Unit): Unit
18
19
/** Asynchronously persists multiple events atomically with stashing */
20
def persistAll[A](events: immutable.Seq[A])(handler: A => Unit): Unit
21
22
/** Asynchronously persists an event without stashing */
23
def persistAsync[A](event: A)(handler: A => Unit): Unit
24
25
/** Asynchronously persists multiple events without stashing */
26
def persistAllAsync[A](events: immutable.Seq[A])(handler: A => Unit): Unit
27
28
/** Defer handler execution with stashing */
29
def defer[A](event: A)(handler: A => Unit): Unit
30
31
/** Defer handler execution without stashing */
32
def deferAsync[A](event: A)(handler: A => Unit): Unit
33
34
/** Recovery handler - called during recovery for each replayed event */
35
def receiveRecover: Receive
36
37
/** Command handler - called for incoming messages */
38
def receiveCommand: Receive
39
}
40
```
41
42
**Usage Examples:**
43
44
```scala
45
import akka.persistence._
46
47
class MyPersistentActor extends PersistentActor {
48
override def persistenceId: String = "my-persistent-actor"
49
50
var state: String = ""
51
52
override def receiveRecover: Receive = {
53
case evt: String => state = evt
54
case SnapshotOffer(_, snapshot: String) => state = snapshot
55
}
56
57
override def receiveCommand: Receive = {
58
case "cmd" =>
59
persist("evt") { event =>
60
state = event
61
sender() ! "ok"
62
}
63
case "get" => sender() ! state
64
}
65
}
66
```
67
68
### AbstractPersistentActor (Java API)
69
70
Java API for persistent actors.
71
72
```scala { .api }
73
/**
74
* Java API for persistent actors
75
*/
76
abstract class AbstractPersistentActor extends AbstractActor with Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery {
77
78
/** Recovery handler - override to handle recovery events */
79
def createReceiveRecover(): AbstractActor.Receive
80
81
/** Command handler - override to handle commands */
82
def createReceive(): AbstractActor.Receive
83
84
/** Persist event with Java-style handler */
85
def persist[A](event: A, handler: Procedure[A]): Unit
86
87
/** Persist multiple events with Java-style handler */
88
def persistAll[A](events: java.util.List[A], handler: Procedure[A]): Unit
89
90
/** Persist event asynchronously with Java-style handler */
91
def persistAsync[A](event: A, handler: Procedure[A]): Unit
92
93
/** Persist multiple events asynchronously with Java-style handler */
94
def persistAllAsync[A](events: java.util.List[A], handler: Procedure[A]): Unit
95
}
96
```
97
98
### AbstractPersistentActorWithTimers
99
100
Java API combining AbstractPersistentActor with timers functionality.
101
102
```scala { .api }
103
/**
104
* Java API combining AbstractPersistentActor with timers functionality
105
*/
106
abstract class AbstractPersistentActorWithTimers extends AbstractPersistentActor with Timers
107
```
108
109
### Core Supporting Traits
110
111
#### PersistenceIdentity
112
113
Identifies persistent actors with unique identifiers and plugin configurations.
114
115
```scala { .api }
116
trait PersistenceIdentity {
117
/** Unique identifier for the persistent entity */
118
def persistenceId: String
119
120
/** Journal plugin configuration id (defaults to empty) */
121
def journalPluginId: String = ""
122
123
/** Snapshot plugin configuration id (defaults to empty) */
124
def snapshotPluginId: String = ""
125
}
126
```
127
128
#### PersistenceRecovery
129
130
Defines recovery behavior and configuration.
131
132
```scala { .api }
133
trait PersistenceRecovery {
134
/** Recovery configuration (defaults to Recovery()) */
135
def recovery: Recovery = Recovery()
136
}
137
```
138
139
#### PersistenceStash
140
141
Stashing functionality for persistent actors with configurable overflow strategies.
142
143
```scala { .api }
144
trait PersistenceStash extends Stash {
145
/** Strategy for handling stash overflow */
146
def internalStashOverflowStrategy: StashOverflowStrategy = DiscardToDeadLetterStrategy
147
}
148
```
149
150
#### RuntimePluginConfig
151
152
Runtime configuration for persistence plugins.
153
154
```scala { .api }
155
trait RuntimePluginConfig {
156
/** Additional journal plugin configuration */
157
def journalPluginConfig: Config = ConfigFactory.empty()
158
159
/** Additional snapshot plugin configuration */
160
def snapshotPluginConfig: Config = ConfigFactory.empty()
161
}
162
```
163
164
### Recovery Types
165
166
#### Recovery Configuration
167
168
Configures the recovery process for persistent actors.
169
170
```scala { .api }
171
/**
172
* Recovery mode configuration
173
*/
174
case class Recovery(
175
fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
176
toSequenceNr: Long = Long.MaxValue,
177
replayMax: Long = Long.MaxValue
178
)
179
180
object Recovery {
181
/** Skip recovery configuration */
182
val none: Recovery = Recovery(SnapshotSelectionCriteria.None, 0L, 0L)
183
184
/** Java API factory methods */
185
def create(): Recovery = Recovery()
186
def create(fromSnapshot: SnapshotSelectionCriteria): Recovery = Recovery(fromSnapshot)
187
def create(toSequenceNr: Long): Recovery = Recovery(toSequenceNr = toSequenceNr)
188
def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long): Recovery = Recovery(fromSnapshot, toSequenceNr)
189
}
190
```
191
192
#### Recovery Messages
193
194
Messages sent during the recovery process.
195
196
```scala { .api }
197
/** Sent when journal replay is finished */
198
case object RecoveryCompleted {
199
/** Java API */
200
def getInstance: RecoveryCompleted.type = this
201
}
202
203
/** Exception thrown when recovery times out */
204
class RecoveryTimedOut(message: String) extends RuntimeException(message)
205
```
206
207
### Stash Overflow Strategies
208
209
#### Base Strategy Trait
210
211
```scala { .api }
212
/** Base trait for stash overflow handling strategies */
213
sealed trait StashOverflowStrategy
214
```
215
216
#### Built-in Strategies
217
218
```scala { .api }
219
/** Discard messages to dead letters when stash overflows */
220
case object DiscardToDeadLetterStrategy extends StashOverflowStrategy {
221
/** Java API */
222
def getInstance: DiscardToDeadLetterStrategy.type = this
223
}
224
225
/** Throw exception when stash overflows */
226
case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy {
227
/** Java API */
228
def getInstance: ThrowOverflowExceptionStrategy.type = this
229
}
230
231
/** Reply with predefined response and discard message */
232
case class ReplyToStrategy(response: Any) extends StashOverflowStrategy
233
```
234
235
#### Strategy Configurator
236
237
```scala { .api }
238
/** Interface for configuring stash overflow strategies */
239
trait StashOverflowStrategyConfigurator {
240
def create(config: Config): StashOverflowStrategy
241
}
242
```
243
244
### Internal Actor State Access
245
246
The Eventsourced trait provides access to internal persistent actor state.
247
248
```scala { .api }
249
trait Eventsourced extends Actor {
250
/** Highest sequence number received */
251
def lastSequenceNr: Long
252
253
/** Current snapshot sequence number */
254
def snapshotSequenceNr: Long
255
256
/** Whether recovery is in progress */
257
def recoveryRunning: Boolean
258
259
/** Whether recovery has completed */
260
def recoveryFinished: Boolean
261
262
/** Delete persistent messages up to sequence number */
263
def deleteMessages(toSequenceNr: Long): Unit
264
}
265
```
266
267
### Example: Complete Event Sourced Actor
268
269
```scala
270
import akka.persistence._
271
import akka.actor.{ActorRef, Props}
272
273
// Events
274
sealed trait CounterEvent
275
case object Incremented extends CounterEvent
276
case object Decremented extends CounterEvent
277
278
// Commands
279
sealed trait CounterCommand
280
case object Increment extends CounterCommand
281
case object Decrement extends CounterCommand
282
case object GetValue extends CounterCommand
283
284
class Counter extends PersistentActor {
285
override def persistenceId: String = "counter-1"
286
287
private var value = 0
288
289
override def receiveRecover: Receive = {
290
case Incremented => value += 1
291
case Decremented => value -= 1
292
case SnapshotOffer(_, snapshot: Int) => value = snapshot
293
}
294
295
override def receiveCommand: Receive = {
296
case Increment =>
297
persist(Incremented) { _ =>
298
value += 1
299
sender() ! value
300
}
301
case Decrement =>
302
persist(Decremented) { _ =>
303
value -= 1
304
sender() ! value
305
}
306
case GetValue => sender() ! value
307
308
// Take snapshot every 10 events
309
case "snap" if lastSequenceNr % 10 == 0 => saveSnapshot(value)
310
}
311
}
312
```