or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

at-least-once-delivery.mddurable-state.mdevent-adapters.mdindex.mdjournal-api.mdpersistent-actors.mdplugin-development.mdsnapshots.md

persistent-actors.mddocs/

0

# Persistent Actors

1

2

Core persistent actor functionality for event sourcing patterns with automatic state recovery and configurable persistence backends.

3

4

## Capabilities

5

6

### PersistentActor Trait

7

8

The main trait for implementing persistent actors using event sourcing patterns.

9

10

```scala { .api }

11

/**

12

* Scala API for persistent actors that can be used to implement command or Event Sourcing patterns

13

*/

14

trait PersistentActor extends Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery {

15

16

/** Asynchronously persists an event with stashing */

17

def persist[A](event: A)(handler: A => Unit): Unit

18

19

/** Asynchronously persists multiple events atomically with stashing */

20

def persistAll[A](events: immutable.Seq[A])(handler: A => Unit): Unit

21

22

/** Asynchronously persists an event without stashing */

23

def persistAsync[A](event: A)(handler: A => Unit): Unit

24

25

/** Asynchronously persists multiple events without stashing */

26

def persistAllAsync[A](events: immutable.Seq[A])(handler: A => Unit): Unit

27

28

/** Defer handler execution with stashing */

29

def defer[A](event: A)(handler: A => Unit): Unit

30

31

/** Defer handler execution without stashing */

32

def deferAsync[A](event: A)(handler: A => Unit): Unit

33

34

/** Recovery handler - called during recovery for each replayed event */

35

def receiveRecover: Receive

36

37

/** Command handler - called for incoming messages */

38

def receiveCommand: Receive

39

}

40

```

41

42

**Usage Examples:**

43

44

```scala

45

import akka.persistence._

46

47

class MyPersistentActor extends PersistentActor {

48

override def persistenceId: String = "my-persistent-actor"

49

50

var state: String = ""

51

52

override def receiveRecover: Receive = {

53

case evt: String => state = evt

54

case SnapshotOffer(_, snapshot: String) => state = snapshot

55

}

56

57

override def receiveCommand: Receive = {

58

case "cmd" =>

59

persist("evt") { event =>

60

state = event

61

sender() ! "ok"

62

}

63

case "get" => sender() ! state

64

}

65

}

66

```

67

68

### AbstractPersistentActor (Java API)

69

70

Java API for persistent actors.

71

72

```scala { .api }

73

/**

74

* Java API for persistent actors

75

*/

76

abstract class AbstractPersistentActor extends AbstractActor with Eventsourced with PersistenceStash with PersistenceIdentity with PersistenceRecovery {

77

78

/** Recovery handler - override to handle recovery events */

79

def createReceiveRecover(): AbstractActor.Receive

80

81

/** Command handler - override to handle commands */

82

def createReceive(): AbstractActor.Receive

83

84

/** Persist event with Java-style handler */

85

def persist[A](event: A, handler: Procedure[A]): Unit

86

87

/** Persist multiple events with Java-style handler */

88

def persistAll[A](events: java.util.List[A], handler: Procedure[A]): Unit

89

90

/** Persist event asynchronously with Java-style handler */

91

def persistAsync[A](event: A, handler: Procedure[A]): Unit

92

93

/** Persist multiple events asynchronously with Java-style handler */

94

def persistAllAsync[A](events: java.util.List[A], handler: Procedure[A]): Unit

95

}

96

```

97

98

### AbstractPersistentActorWithTimers

99

100

Java API combining AbstractPersistentActor with timers functionality.

101

102

```scala { .api }

103

/**

104

* Java API combining AbstractPersistentActor with timers functionality

105

*/

106

abstract class AbstractPersistentActorWithTimers extends AbstractPersistentActor with Timers

107

```

108

109

### Core Supporting Traits

110

111

#### PersistenceIdentity

112

113

Identifies persistent actors with unique identifiers and plugin configurations.

114

115

```scala { .api }

116

trait PersistenceIdentity {

117

/** Unique identifier for the persistent entity */

118

def persistenceId: String

119

120

/** Journal plugin configuration id (defaults to empty) */

121

def journalPluginId: String = ""

122

123

/** Snapshot plugin configuration id (defaults to empty) */

124

def snapshotPluginId: String = ""

125

}

126

```

127

128

#### PersistenceRecovery

129

130

Defines recovery behavior and configuration.

131

132

```scala { .api }

133

trait PersistenceRecovery {

134

/** Recovery configuration (defaults to Recovery()) */

135

def recovery: Recovery = Recovery()

136

}

137

```

138

139

#### PersistenceStash

140

141

Stashing functionality for persistent actors with configurable overflow strategies.

142

143

```scala { .api }

144

trait PersistenceStash extends Stash {

145

/** Strategy for handling stash overflow */

146

def internalStashOverflowStrategy: StashOverflowStrategy = DiscardToDeadLetterStrategy

147

}

148

```

149

150

#### RuntimePluginConfig

151

152

Runtime configuration for persistence plugins.

153

154

```scala { .api }

155

trait RuntimePluginConfig {

156

/** Additional journal plugin configuration */

157

def journalPluginConfig: Config = ConfigFactory.empty()

158

159

/** Additional snapshot plugin configuration */

160

def snapshotPluginConfig: Config = ConfigFactory.empty()

161

}

162

```

163

164

### Recovery Types

165

166

#### Recovery Configuration

167

168

Configures the recovery process for persistent actors.

169

170

```scala { .api }

171

/**

172

* Recovery mode configuration

173

*/

174

case class Recovery(

175

fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,

176

toSequenceNr: Long = Long.MaxValue,

177

replayMax: Long = Long.MaxValue

178

)

179

180

object Recovery {

181

/** Skip recovery configuration */

182

val none: Recovery = Recovery(SnapshotSelectionCriteria.None, 0L, 0L)

183

184

/** Java API factory methods */

185

def create(): Recovery = Recovery()

186

def create(fromSnapshot: SnapshotSelectionCriteria): Recovery = Recovery(fromSnapshot)

187

def create(toSequenceNr: Long): Recovery = Recovery(toSequenceNr = toSequenceNr)

188

def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long): Recovery = Recovery(fromSnapshot, toSequenceNr)

189

}

190

```

191

192

#### Recovery Messages

193

194

Messages sent during the recovery process.

195

196

```scala { .api }

197

/** Sent when journal replay is finished */

198

case object RecoveryCompleted {

199

/** Java API */

200

def getInstance: RecoveryCompleted.type = this

201

}

202

203

/** Exception thrown when recovery times out */

204

class RecoveryTimedOut(message: String) extends RuntimeException(message)

205

```

206

207

### Stash Overflow Strategies

208

209

#### Base Strategy Trait

210

211

```scala { .api }

212

/** Base trait for stash overflow handling strategies */

213

sealed trait StashOverflowStrategy

214

```

215

216

#### Built-in Strategies

217

218

```scala { .api }

219

/** Discard messages to dead letters when stash overflows */

220

case object DiscardToDeadLetterStrategy extends StashOverflowStrategy {

221

/** Java API */

222

def getInstance: DiscardToDeadLetterStrategy.type = this

223

}

224

225

/** Throw exception when stash overflows */

226

case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy {

227

/** Java API */

228

def getInstance: ThrowOverflowExceptionStrategy.type = this

229

}

230

231

/** Reply with predefined response and discard message */

232

case class ReplyToStrategy(response: Any) extends StashOverflowStrategy

233

```

234

235

#### Strategy Configurator

236

237

```scala { .api }

238

/** Interface for configuring stash overflow strategies */

239

trait StashOverflowStrategyConfigurator {

240

def create(config: Config): StashOverflowStrategy

241

}

242

```

243

244

### Internal Actor State Access

245

246

The Eventsourced trait provides access to internal persistent actor state.

247

248

```scala { .api }

249

trait Eventsourced extends Actor {

250

/** Highest sequence number received */

251

def lastSequenceNr: Long

252

253

/** Current snapshot sequence number */

254

def snapshotSequenceNr: Long

255

256

/** Whether recovery is in progress */

257

def recoveryRunning: Boolean

258

259

/** Whether recovery has completed */

260

def recoveryFinished: Boolean

261

262

/** Delete persistent messages up to sequence number */

263

def deleteMessages(toSequenceNr: Long): Unit

264

}

265

```

266

267

### Example: Complete Event Sourced Actor

268

269

```scala

270

import akka.persistence._

271

import akka.actor.{ActorRef, Props}

272

273

// Events

274

sealed trait CounterEvent

275

case object Incremented extends CounterEvent

276

case object Decremented extends CounterEvent

277

278

// Commands

279

sealed trait CounterCommand

280

case object Increment extends CounterCommand

281

case object Decrement extends CounterCommand

282

case object GetValue extends CounterCommand

283

284

class Counter extends PersistentActor {

285

override def persistenceId: String = "counter-1"

286

287

private var value = 0

288

289

override def receiveRecover: Receive = {

290

case Incremented => value += 1

291

case Decremented => value -= 1

292

case SnapshotOffer(_, snapshot: Int) => value = snapshot

293

}

294

295

override def receiveCommand: Receive = {

296

case Increment =>

297

persist(Incremented) { _ =>

298

value += 1

299

sender() ! value

300

}

301

case Decrement =>

302

persist(Decremented) { _ =>

303

value -= 1

304

sender() ! value

305

}

306

case GetValue => sender() ! value

307

308

// Take snapshot every 10 events

309

case "snap" if lastSequenceNr % 10 == 0 => saveSnapshot(value)

310

}

311

}

312

```