or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

at-least-once-delivery.mddurable-state.mdevent-adapters.mdindex.mdjournal-api.mdpersistent-actors.mdplugin-development.mdsnapshots.md

durable-state.mddocs/

0

# Durable State Management

1

2

Durable state storage for mutable state management with revision tracking and pluggable storage backends.

3

4

## Capabilities

5

6

### DurableStateStore (Scala API)

7

8

API for reading durable state objects with type safety.

9

10

```scala { .api }

11

/**

12

* API for reading durable state objects

13

*/

14

trait DurableStateStore[A] {

15

/** Retrieve state object by persistence ID */

16

def getObject(persistenceId: String): Future[GetObjectResult[A]]

17

}

18

```

19

20

### GetObjectResult

21

22

Result container for durable state retrieval operations.

23

24

```scala { .api }

25

/**

26

* Result of durable state retrieval containing value and revision

27

*/

28

case class GetObjectResult[A](

29

value: Option[A],

30

revision: Long

31

) {

32

/** Convert to Java API result */

33

def toJava: JGetObjectResult[A] = JGetObjectResult(value.asJava, revision)

34

}

35

```

36

37

**Usage Examples:**

38

39

```scala

40

import akka.persistence.state.scaladsl.DurableStateStore

41

import scala.concurrent.Future

42

43

// Basic state retrieval

44

val stateStore: DurableStateStore[UserProfile] = // ... obtain store

45

val userStateFuture: Future[GetObjectResult[UserProfile]] =

46

stateStore.getObject("user-123")

47

48

userStateFuture.foreach { result =>

49

result.value match {

50

case Some(profile) =>

51

println(s"User profile at revision ${result.revision}: $profile")

52

case None =>

53

println("No state found for user-123")

54

}

55

}

56

```

57

58

### DurableStateUpdateStore (Scala API)

59

60

Extended API for reading and updating durable state objects.

61

62

```scala { .api }

63

/**

64

* API for reading and updating durable state objects

65

*/

66

trait DurableStateUpdateStore[A] extends DurableStateStore[A] {

67

/**

68

* Upsert state object with optimistic concurrency control

69

* @param persistenceId Unique identifier for the state

70

* @param revision Expected current revision (starts at 1)

71

* @param value New state value

72

* @param tag Optional tag for the update

73

* @return Future completed when operation finishes

74

*/

75

def upsertObject(

76

persistenceId: String,

77

revision: Long,

78

value: A,

79

tag: String

80

): Future[Done]

81

82

/**

83

* Delete state object

84

* @param persistenceId Unique identifier for the state

85

* @param revision Expected current revision

86

* @return Future completed when operation finishes

87

*/

88

def deleteObject(

89

persistenceId: String,

90

revision: Long

91

): Future[Done]

92

}

93

```

94

95

96

### Java API

97

98

#### DurableStateStore (Java API)

99

100

```scala { .api }

101

/**

102

* Java API for reading durable state objects

103

*/

104

trait DurableStateStore[A] {

105

/** Retrieve state object by persistence ID */

106

def getObject(persistenceId: String): CompletionStage[JGetObjectResult[A]]

107

}

108

```

109

110

#### JGetObjectResult

111

112

```scala { .api }

113

/**

114

* Java API result of durable state retrieval

115

*/

116

case class JGetObjectResult[A](

117

value: Optional[A],

118

revision: Long

119

)

120

```

121

122

#### DurableStateUpdateStore (Java API)

123

124

```scala { .api }

125

/**

126

* Java API for reading and updating durable state objects

127

*/

128

trait DurableStateUpdateStore[A] extends DurableStateStore[A] {

129

/** Upsert state object with optimistic concurrency control */

130

def upsertObject(

131

persistenceId: String,

132

revision: Long,

133

value: A,

134

tag: String

135

): CompletionStage[Done]

136

137

/** Delete state object */

138

def deleteObject(

139

persistenceId: String,

140

revision: Long

141

): CompletionStage[Done]

142

}

143

```

144

145

### Durable State Registry and Provider

146

147

#### DurableStateStoreRegistry

148

149

Registry for obtaining durable state store instances.

150

151

```scala { .api }

152

/**

153

* Registry for obtaining configured durable state store instances

154

*/

155

object DurableStateStoreRegistry extends ExtensionId[DurableStateStoreRegistry] {

156

/** Get durable state store for the specified plugin ID */

157

def durableStateStoreFor[A](

158

system: ActorSystem,

159

pluginId: String

160

): DurableStateStore[A]

161

}

162

163

class DurableStateStoreRegistry(system: ExtendedActorSystem) extends Extension {

164

/** Get durable state store by plugin ID */

165

def durableStateStoreFor[A](pluginId: String): DurableStateStore[A]

166

}

167

```

168

169

#### DurableStateStoreProvider

170

171

Provider interface for durable state store plugins.

172

173

```scala { .api }

174

/**

175

* Provider interface for durable state store plugin implementations

176

*/

177

trait DurableStateStoreProvider {

178

/** Create durable state store instance */

179

def scaladslDurableStateStore(): DurableStateStore[Any]

180

181

/** Create Java API durable state store instance */

182

def javadslDurableStateStore(): JDurableStateStore[AnyRef]

183

}

184

```

185

186

### Example: User Profile Management

187

188

```scala

189

import akka.persistence.state.scaladsl.{DurableStateStore, DurableStateUpdateStore}

190

import akka.actor.ActorSystem

191

import akka.Done

192

import scala.concurrent.{Future, ExecutionContext}

193

import scala.util.{Success, Failure}

194

195

// Domain model

196

case class UserProfile(

197

userId: String,

198

name: String,

199

email: String,

200

preferences: Map[String, Any],

201

lastModified: Long

202

)

203

204

class UserProfileService(

205

stateStore: DurableStateUpdateStore[UserProfile]

206

)(implicit ec: ExecutionContext) {

207

208

def getProfile(userId: String): Future[Option[UserProfile]] = {

209

stateStore.getObject(s"user-profile-$userId").map(_.value)

210

}

211

212

def createProfile(profile: UserProfile): Future[Done] = {

213

val profileWithTimestamp = profile.copy(lastModified = System.currentTimeMillis())

214

stateStore.upsertObject(

215

persistenceId = s"user-profile-${profile.userId}",

216

revision = 0L, // New profile

217

value = profileWithTimestamp,

218

tag = ""

219

)

220

}

221

222

def updateProfile(userId: String, updater: UserProfile => UserProfile): Future[Option[Done]] = {

223

for {

224

current <- stateStore.getObject(s"user-profile-$userId")

225

result <- current.value match {

226

case Some(profile) =>

227

val updated = updater(profile).copy(lastModified = System.currentTimeMillis())

228

stateStore.upsertObject(

229

persistenceId = s"user-profile-$userId",

230

revision = current.revision,

231

value = updated,

232

tag = ""

233

).map(result => Some(result))

234

case None =>

235

Future.successful(None)

236

}

237

} yield result

238

}

239

240

def deleteProfile(userId: String): Future[Boolean] = {

241

for {

242

current <- stateStore.getObject(s"user-profile-$userId")

243

result <- current.value match {

244

case Some(_) =>

245

stateStore.deleteObject(

246

persistenceId = s"user-profile-$userId",

247

revision = current.revision

248

).map(_ => true)

249

case None =>

250

Future.successful(false)

251

}

252

} yield result

253

}

254

}

255

256

// Usage example

257

implicit val system: ActorSystem = ActorSystem("user-service")

258

implicit val ec: ExecutionContext = system.dispatcher

259

260

val stateStore: DurableStateUpdateStore[UserProfile] =

261

DurableStateStoreRegistry(system).durableStateStoreFor("my-state-store")

262

263

val userService = new UserProfileService(stateStore)

264

265

// Create new profile

266

val newProfile = UserProfile(

267

userId = "user123",

268

name = "John Doe",

269

email = "john@example.com",

270

preferences = Map("theme" -> "dark", "notifications" -> true),

271

lastModified = 0L

272

)

273

274

userService.createProfile(newProfile).onComplete {

275

case Success(_) => println("Profile created successfully")

276

case Failure(ex) => println(s"Failed to create profile: ${ex.getMessage}")

277

}

278

279

// Update profile

280

userService.updateProfile("user123", profile =>

281

profile.copy(preferences = profile.preferences + ("language" -> "en"))

282

).onComplete {

283

case Success(Some(_)) => println("Profile updated successfully")

284

case Success(None) => println("Profile not found")

285

case Failure(ex) => println(s"Failed to update profile: ${ex.getMessage}")

286

}

287

```

288

289

### Example: Configuration Management

290

291

```scala

292

import akka.persistence.state.scaladsl.DurableStateUpdateStore

293

import akka.Done

294

import scala.concurrent.{Future, ExecutionContext}

295

296

case class ApplicationConfig(

297

version: String,

298

features: Set[String],

299

settings: Map[String, Any],

300

environment: String

301

)

302

303

class ConfigurationManager(

304

stateStore: DurableStateUpdateStore[ApplicationConfig]

305

)(implicit ec: ExecutionContext) {

306

307

private val configId = "application-config"

308

309

def getCurrentConfig: Future[ApplicationConfig] = {

310

stateStore.getObject(configId).map { result =>

311

result.value.getOrElse(getDefaultConfig)

312

}

313

}

314

315

def updateConfig(config: ApplicationConfig): Future[Done] = {

316

for {

317

current <- stateStore.getObject(configId)

318

revision = current.revision

319

result <- stateStore.upsertObject(configId, revision, config, "")

320

} yield result

321

}

322

323

def enableFeature(feature: String): Future[Done] = {

324

updateConfigField { config =>

325

config.copy(features = config.features + feature)

326

}

327

}

328

329

def disableFeature(feature: String): Future[Done] = {

330

updateConfigField { config =>

331

config.copy(features = config.features - feature)

332

}

333

}

334

335

def updateSetting(key: String, value: Any): Future[Done] = {

336

updateConfigField { config =>

337

config.copy(settings = config.settings + (key -> value))

338

}

339

}

340

341

private def updateConfigField(updater: ApplicationConfig => ApplicationConfig): Future[Done] = {

342

for {

343

current <- getCurrentConfig

344

updated = updater(current)

345

result <- updateConfig(updated)

346

} yield result

347

}

348

349

private def getDefaultConfig: ApplicationConfig = {

350

ApplicationConfig(

351

version = "1.0.0",

352

features = Set.empty,

353

settings = Map.empty,

354

environment = "development"

355

)

356

}

357

}

358

```

359

360

### Error Handling and Optimistic Concurrency

361

362

```scala

363

import akka.persistence.state.{DurableStateStoreException, RevisionMismatchException}

364

365

class SafeStateManager[T](

366

stateStore: DurableStateUpdateStore[T]

367

)(implicit ec: ExecutionContext) {

368

369

def safeUpdate(

370

persistenceId: String,

371

updater: T => T,

372

maxRetries: Int = 3

373

): Future[Either[String, Long]] = {

374

375

def attemptUpdate(retryCount: Int): Future[Either[String, Long]] = {

376

for {

377

current <- stateStore.getObject(persistenceId)

378

result <- current.value match {

379

case Some(value) =>

380

val updated = updater(value)

381

stateStore.upsertObject(persistenceId, current.revision, updated, "")

382

.map(_ => Right(current.revision + 1))

383

.recover {

384

case _: RevisionMismatchException if retryCount < maxRetries =>

385

// Retry on revision mismatch (optimistic concurrency conflict)

386

Left("revision_mismatch")

387

case ex: DurableStateStoreException =>

388

Left(s"Store error: ${ex.getMessage}")

389

case ex =>

390

Left(s"Unexpected error: ${ex.getMessage}")

391

}

392

case None =>

393

Future.successful(Left("not_found"))

394

}

395

finalResult <- result match {

396

case Left("revision_mismatch") => attemptUpdate(retryCount + 1)

397

case other => Future.successful(other)

398

}

399

} yield finalResult

400

}

401

402

attemptUpdate(0)

403

}

404

}

405

```

406

407

### Configuration

408

409

Durable state stores are configured in application.conf:

410

411

```hocon

412

akka.persistence.state {

413

plugin = "akka.persistence.state.inmem"

414

415

# In-memory state store (for testing)

416

inmem {

417

class = "akka.persistence.state.InMemDurableStateStoreProvider"

418

}

419

420

# Custom state store plugin

421

my-state-store {

422

class = "com.example.MyDurableStateStoreProvider"

423

424

# Plugin-specific configuration

425

connection-string = "postgresql://localhost/mydb"

426

table-name = "durable_state"

427

}

428

}

429

```