0
# Durable State Management
1
2
Durable state storage for mutable state management with revision tracking and pluggable storage backends.
3
4
## Capabilities
5
6
### DurableStateStore (Scala API)
7
8
API for reading durable state objects with type safety.
9
10
```scala { .api }
11
/**
12
* API for reading durable state objects
13
*/
14
trait DurableStateStore[A] {
15
/** Retrieve state object by persistence ID */
16
def getObject(persistenceId: String): Future[GetObjectResult[A]]
17
}
18
```
19
20
### GetObjectResult
21
22
Result container for durable state retrieval operations.
23
24
```scala { .api }
25
/**
26
* Result of durable state retrieval containing value and revision
27
*/
28
case class GetObjectResult[A](
29
value: Option[A],
30
revision: Long
31
) {
32
/** Convert to Java API result */
33
def toJava: JGetObjectResult[A] = JGetObjectResult(value.asJava, revision)
34
}
35
```
36
37
**Usage Examples:**
38
39
```scala
40
import akka.persistence.state.scaladsl.DurableStateStore
41
import scala.concurrent.Future
42
43
// Basic state retrieval
44
val stateStore: DurableStateStore[UserProfile] = // ... obtain store
45
val userStateFuture: Future[GetObjectResult[UserProfile]] =
46
stateStore.getObject("user-123")
47
48
userStateFuture.foreach { result =>
49
result.value match {
50
case Some(profile) =>
51
println(s"User profile at revision ${result.revision}: $profile")
52
case None =>
53
println("No state found for user-123")
54
}
55
}
56
```
57
58
### DurableStateUpdateStore (Scala API)
59
60
Extended API for reading and updating durable state objects.
61
62
```scala { .api }
63
/**
64
* API for reading and updating durable state objects
65
*/
66
trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
67
/**
68
* Upsert state object with optimistic concurrency control
69
* @param persistenceId Unique identifier for the state
70
* @param revision Expected current revision (starts at 1)
71
* @param value New state value
72
* @param tag Optional tag for the update
73
* @return Future completed when operation finishes
74
*/
75
def upsertObject(
76
persistenceId: String,
77
revision: Long,
78
value: A,
79
tag: String
80
): Future[Done]
81
82
/**
83
* Delete state object
84
* @param persistenceId Unique identifier for the state
85
* @param revision Expected current revision
86
* @return Future completed when operation finishes
87
*/
88
def deleteObject(
89
persistenceId: String,
90
revision: Long
91
): Future[Done]
92
}
93
```
94
95
96
### Java API
97
98
#### DurableStateStore (Java API)
99
100
```scala { .api }
101
/**
102
* Java API for reading durable state objects
103
*/
104
trait DurableStateStore[A] {
105
/** Retrieve state object by persistence ID */
106
def getObject(persistenceId: String): CompletionStage[JGetObjectResult[A]]
107
}
108
```
109
110
#### JGetObjectResult
111
112
```scala { .api }
113
/**
114
* Java API result of durable state retrieval
115
*/
116
case class JGetObjectResult[A](
117
value: Optional[A],
118
revision: Long
119
)
120
```
121
122
#### DurableStateUpdateStore (Java API)
123
124
```scala { .api }
125
/**
126
* Java API for reading and updating durable state objects
127
*/
128
trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
129
/** Upsert state object with optimistic concurrency control */
130
def upsertObject(
131
persistenceId: String,
132
revision: Long,
133
value: A,
134
tag: String
135
): CompletionStage[Done]
136
137
/** Delete state object */
138
def deleteObject(
139
persistenceId: String,
140
revision: Long
141
): CompletionStage[Done]
142
}
143
```
144
145
### Durable State Registry and Provider
146
147
#### DurableStateStoreRegistry
148
149
Registry for obtaining durable state store instances.
150
151
```scala { .api }
152
/**
153
* Registry for obtaining configured durable state store instances
154
*/
155
object DurableStateStoreRegistry extends ExtensionId[DurableStateStoreRegistry] {
156
/** Get durable state store for the specified plugin ID */
157
def durableStateStoreFor[A](
158
system: ActorSystem,
159
pluginId: String
160
): DurableStateStore[A]
161
}
162
163
class DurableStateStoreRegistry(system: ExtendedActorSystem) extends Extension {
164
/** Get durable state store by plugin ID */
165
def durableStateStoreFor[A](pluginId: String): DurableStateStore[A]
166
}
167
```
168
169
#### DurableStateStoreProvider
170
171
Provider interface for durable state store plugins.
172
173
```scala { .api }
174
/**
175
* Provider interface for durable state store plugin implementations
176
*/
177
trait DurableStateStoreProvider {
178
/** Create durable state store instance */
179
def scaladslDurableStateStore(): DurableStateStore[Any]
180
181
/** Create Java API durable state store instance */
182
def javadslDurableStateStore(): JDurableStateStore[AnyRef]
183
}
184
```
185
186
### Example: User Profile Management
187
188
```scala
189
import akka.persistence.state.scaladsl.{DurableStateStore, DurableStateUpdateStore}
190
import akka.actor.ActorSystem
191
import akka.Done
192
import scala.concurrent.{Future, ExecutionContext}
193
import scala.util.{Success, Failure}
194
195
// Domain model
196
case class UserProfile(
197
userId: String,
198
name: String,
199
email: String,
200
preferences: Map[String, Any],
201
lastModified: Long
202
)
203
204
class UserProfileService(
205
stateStore: DurableStateUpdateStore[UserProfile]
206
)(implicit ec: ExecutionContext) {
207
208
def getProfile(userId: String): Future[Option[UserProfile]] = {
209
stateStore.getObject(s"user-profile-$userId").map(_.value)
210
}
211
212
def createProfile(profile: UserProfile): Future[Done] = {
213
val profileWithTimestamp = profile.copy(lastModified = System.currentTimeMillis())
214
stateStore.upsertObject(
215
persistenceId = s"user-profile-${profile.userId}",
216
revision = 0L, // New profile
217
value = profileWithTimestamp,
218
tag = ""
219
)
220
}
221
222
def updateProfile(userId: String, updater: UserProfile => UserProfile): Future[Option[Done]] = {
223
for {
224
current <- stateStore.getObject(s"user-profile-$userId")
225
result <- current.value match {
226
case Some(profile) =>
227
val updated = updater(profile).copy(lastModified = System.currentTimeMillis())
228
stateStore.upsertObject(
229
persistenceId = s"user-profile-$userId",
230
revision = current.revision,
231
value = updated,
232
tag = ""
233
).map(result => Some(result))
234
case None =>
235
Future.successful(None)
236
}
237
} yield result
238
}
239
240
def deleteProfile(userId: String): Future[Boolean] = {
241
for {
242
current <- stateStore.getObject(s"user-profile-$userId")
243
result <- current.value match {
244
case Some(_) =>
245
stateStore.deleteObject(
246
persistenceId = s"user-profile-$userId",
247
revision = current.revision
248
).map(_ => true)
249
case None =>
250
Future.successful(false)
251
}
252
} yield result
253
}
254
}
255
256
// Usage example
257
implicit val system: ActorSystem = ActorSystem("user-service")
258
implicit val ec: ExecutionContext = system.dispatcher
259
260
val stateStore: DurableStateUpdateStore[UserProfile] =
261
DurableStateStoreRegistry(system).durableStateStoreFor("my-state-store")
262
263
val userService = new UserProfileService(stateStore)
264
265
// Create new profile
266
val newProfile = UserProfile(
267
userId = "user123",
268
name = "John Doe",
269
email = "john@example.com",
270
preferences = Map("theme" -> "dark", "notifications" -> true),
271
lastModified = 0L
272
)
273
274
userService.createProfile(newProfile).onComplete {
275
case Success(_) => println("Profile created successfully")
276
case Failure(ex) => println(s"Failed to create profile: ${ex.getMessage}")
277
}
278
279
// Update profile
280
userService.updateProfile("user123", profile =>
281
profile.copy(preferences = profile.preferences + ("language" -> "en"))
282
).onComplete {
283
case Success(Some(_)) => println("Profile updated successfully")
284
case Success(None) => println("Profile not found")
285
case Failure(ex) => println(s"Failed to update profile: ${ex.getMessage}")
286
}
287
```
288
289
### Example: Configuration Management
290
291
```scala
292
import akka.persistence.state.scaladsl.DurableStateUpdateStore
293
import akka.Done
294
import scala.concurrent.{Future, ExecutionContext}
295
296
case class ApplicationConfig(
297
version: String,
298
features: Set[String],
299
settings: Map[String, Any],
300
environment: String
301
)
302
303
class ConfigurationManager(
304
stateStore: DurableStateUpdateStore[ApplicationConfig]
305
)(implicit ec: ExecutionContext) {
306
307
private val configId = "application-config"
308
309
def getCurrentConfig: Future[ApplicationConfig] = {
310
stateStore.getObject(configId).map { result =>
311
result.value.getOrElse(getDefaultConfig)
312
}
313
}
314
315
def updateConfig(config: ApplicationConfig): Future[Done] = {
316
for {
317
current <- stateStore.getObject(configId)
318
revision = current.revision
319
result <- stateStore.upsertObject(configId, revision, config, "")
320
} yield result
321
}
322
323
def enableFeature(feature: String): Future[Done] = {
324
updateConfigField { config =>
325
config.copy(features = config.features + feature)
326
}
327
}
328
329
def disableFeature(feature: String): Future[Done] = {
330
updateConfigField { config =>
331
config.copy(features = config.features - feature)
332
}
333
}
334
335
def updateSetting(key: String, value: Any): Future[Done] = {
336
updateConfigField { config =>
337
config.copy(settings = config.settings + (key -> value))
338
}
339
}
340
341
private def updateConfigField(updater: ApplicationConfig => ApplicationConfig): Future[Done] = {
342
for {
343
current <- getCurrentConfig
344
updated = updater(current)
345
result <- updateConfig(updated)
346
} yield result
347
}
348
349
private def getDefaultConfig: ApplicationConfig = {
350
ApplicationConfig(
351
version = "1.0.0",
352
features = Set.empty,
353
settings = Map.empty,
354
environment = "development"
355
)
356
}
357
}
358
```
359
360
### Error Handling and Optimistic Concurrency
361
362
```scala
363
import akka.persistence.state.{DurableStateStoreException, RevisionMismatchException}
364
365
class SafeStateManager[T](
366
stateStore: DurableStateUpdateStore[T]
367
)(implicit ec: ExecutionContext) {
368
369
def safeUpdate(
370
persistenceId: String,
371
updater: T => T,
372
maxRetries: Int = 3
373
): Future[Either[String, Long]] = {
374
375
def attemptUpdate(retryCount: Int): Future[Either[String, Long]] = {
376
for {
377
current <- stateStore.getObject(persistenceId)
378
result <- current.value match {
379
case Some(value) =>
380
val updated = updater(value)
381
stateStore.upsertObject(persistenceId, current.revision, updated, "")
382
.map(_ => Right(current.revision + 1))
383
.recover {
384
case _: RevisionMismatchException if retryCount < maxRetries =>
385
// Retry on revision mismatch (optimistic concurrency conflict)
386
Left("revision_mismatch")
387
case ex: DurableStateStoreException =>
388
Left(s"Store error: ${ex.getMessage}")
389
case ex =>
390
Left(s"Unexpected error: ${ex.getMessage}")
391
}
392
case None =>
393
Future.successful(Left("not_found"))
394
}
395
finalResult <- result match {
396
case Left("revision_mismatch") => attemptUpdate(retryCount + 1)
397
case other => Future.successful(other)
398
}
399
} yield finalResult
400
}
401
402
attemptUpdate(0)
403
}
404
}
405
```
406
407
### Configuration
408
409
Durable state stores are configured in application.conf:
410
411
```hocon
412
akka.persistence.state {
413
plugin = "akka.persistence.state.inmem"
414
415
# In-memory state store (for testing)
416
inmem {
417
class = "akka.persistence.state.InMemDurableStateStoreProvider"
418
}
419
420
# Custom state store plugin
421
my-state-store {
422
class = "com.example.MyDurableStateStoreProvider"
423
424
# Plugin-specific configuration
425
connection-string = "postgresql://localhost/mydb"
426
table-name = "durable_state"
427
}
428
}
429
```