or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

at-least-once-delivery.mddurable-state.mdevent-adapters.mdindex.mdjournal-api.mdpersistent-actors.mdplugin-development.mdsnapshots.md

at-least-once-delivery.mddocs/

0

# At-Least-Once Delivery

1

2

Reliable message delivery with automatic redelivery, confirmation tracking, and configurable retry policies.

3

4

## Capabilities

5

6

### AtLeastOnceDelivery Trait

7

8

Scala API for at-least-once delivery semantics.

9

10

```scala { .api }

11

/**

12

* Scala API for at-least-once delivery semantics

13

*/

14

trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {

15

16

/** Deliver message to actor path with delivery confirmation tracking */

17

def deliver(destination: ActorPath)(deliveryIdToMessage: Long => Any): Unit

18

19

/** Deliver message to actor selection with delivery confirmation tracking */

20

def deliver(destination: ActorSelection)(deliveryIdToMessage: Long => Any): Unit

21

}

22

```

23

24

### AtLeastOnceDeliveryLike Core Functionality

25

26

Core at-least-once delivery functionality and configuration.

27

28

```scala { .api }

29

/**

30

* Core at-least-once delivery functionality

31

*/

32

trait AtLeastOnceDeliveryLike extends Eventsourced {

33

34

/** Confirm successful delivery of message by delivery ID */

35

def confirmDelivery(deliveryId: Long): Boolean

36

37

/** Get count of unconfirmed deliveries */

38

def numberOfUnconfirmed: Int

39

40

/** Get snapshot of current delivery state */

41

def getDeliverySnapshot: AtLeastOnceDeliverySnapshot

42

43

/** Restore delivery state from snapshot */

44

def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit

45

46

// Configuration properties with defaults

47

/** Interval between redelivery attempts */

48

def redeliverInterval: FiniteDuration = 5.seconds

49

50

/** Maximum number of messages to redeliver in single burst */

51

def redeliveryBurstLimit: Int = 10000

52

53

/** Warn after this many unconfirmed delivery attempts */

54

def warnAfterNumberOfUnconfirmedAttempts: Int = 5

55

56

/** Maximum number of unconfirmed messages allowed */

57

def maxUnconfirmedMessages: Int = 100000

58

}

59

```

60

61

### AbstractPersistentActorWithAtLeastOnceDelivery (Java API)

62

63

Java API for at-least-once delivery.

64

65

```scala { .api }

66

/**

67

* Java API for at-least-once delivery

68

*/

69

abstract class AbstractPersistentActorWithAtLeastOnceDelivery

70

extends AbstractPersistentActor with AtLeastOnceDeliveryLike {

71

72

/** Deliver message to actor path (Java API) */

73

def deliver(destination: ActorPath, deliveryIdToMessage: Function[java.lang.Long, Any]): Unit

74

75

/** Deliver message to actor selection (Java API) */

76

def deliver(destination: ActorSelection, deliveryIdToMessage: Function[java.lang.Long, Any]): Unit

77

}

78

```

79

80

### Delivery State Management

81

82

#### AtLeastOnceDeliverySnapshot

83

84

Snapshot representation of delivery state for persistence.

85

86

```scala { .api }

87

/**

88

* Snapshot of at-least-once delivery state

89

*/

90

case class AtLeastOnceDeliverySnapshot(

91

currentDeliveryId: Long,

92

unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]

93

) {

94

/** Java API to get unconfirmed deliveries */

95

def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] =

96

unconfirmedDeliveries.asJava

97

}

98

```

99

100

#### UnconfirmedDelivery

101

102

Information about pending delivery attempts.

103

104

```scala { .api }

105

/**

106

* Information about an unconfirmed delivery attempt

107

*/

108

case class UnconfirmedDelivery(

109

deliveryId: Long,

110

destination: ActorPath,

111

message: Any

112

) {

113

/** Java API to get message */

114

def getMessage(): AnyRef = message.asInstanceOf[AnyRef]

115

}

116

```

117

118

### Delivery Warnings and Exceptions

119

120

#### UnconfirmedWarning

121

122

Warning message sent when deliveries remain unconfirmed.

123

124

```scala { .api }

125

/**

126

* Warning about unconfirmed deliveries sent to self

127

*/

128

case class UnconfirmedWarning(

129

unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]

130

) {

131

/** Java API to get unconfirmed deliveries */

132

def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] =

133

unconfirmedDeliveries.asJava

134

}

135

```

136

137

#### MaxUnconfirmedMessagesExceededException

138

139

Exception thrown when maximum unconfirmed message limit is exceeded.

140

141

```scala { .api }

142

/**

143

* Exception when max unconfirmed messages limit is exceeded

144

*/

145

class MaxUnconfirmedMessagesExceededException(message: String)

146

extends RuntimeException(message)

147

```

148

149

### Example: Basic At-Least-Once Delivery

150

151

```scala

152

import akka.persistence._

153

import akka.actor.ActorPath

154

import scala.concurrent.duration._

155

156

// Messages

157

case class SendOrder(orderId: String, destination: ActorPath)

158

case class OrderMessage(orderId: String, deliveryId: Long)

159

case class OrderConfirmation(deliveryId: Long)

160

161

class OrderProcessor extends PersistentActor with AtLeastOnceDelivery {

162

override def persistenceId: String = "order-processor"

163

164

// Configure delivery settings

165

override def redeliverInterval: FiniteDuration = 30.seconds

166

override def maxUnconfirmedMessages: Int = 10000

167

override def warnAfterNumberOfUnconfirmedAttempts: Int = 3

168

169

override def receiveCommand: Receive = {

170

case SendOrder(orderId, destination) =>

171

persist(SendOrder(orderId, destination)) { evt =>

172

// Deliver with automatic retry until confirmed

173

deliver(destination) { deliveryId =>

174

OrderMessage(orderId, deliveryId)

175

}

176

}

177

178

case OrderConfirmation(deliveryId) =>

179

persist(OrderConfirmation(deliveryId)) { evt =>

180

val confirmed = confirmDelivery(deliveryId)

181

if (confirmed) {

182

println(s"Confirmed delivery $deliveryId")

183

} else {

184

println(s"Delivery $deliveryId was already confirmed or not found")

185

}

186

}

187

188

case UnconfirmedWarning(unconfirmed) =>

189

println(s"Warning: ${unconfirmed.size} unconfirmed deliveries")

190

unconfirmed.foreach { delivery =>

191

println(s" Delivery ${delivery.deliveryId} to ${delivery.destination}")

192

}

193

}

194

195

override def receiveRecover: Receive = {

196

case SendOrder(orderId, destination) =>

197

deliver(destination) { deliveryId =>

198

OrderMessage(orderId, deliveryId)

199

}

200

201

case OrderConfirmation(deliveryId) =>

202

confirmDelivery(deliveryId)

203

}

204

}

205

```

206

207

### Example: Order Fulfillment System

208

209

```scala

210

import akka.persistence._

211

import akka.actor.{ActorRef, ActorPath}

212

213

// Events for persistence

214

sealed trait OrderEvent

215

case class OrderReceived(orderId: String, items: List[String]) extends OrderEvent

216

case class OrderSentToWarehouse(orderId: String, warehousePath: ActorPath, deliveryId: Long) extends OrderEvent

217

case class OrderSentToBilling(orderId: String, billingPath: ActorPath, deliveryId: Long) extends OrderEvent

218

case class WarehouseConfirmed(deliveryId: Long) extends OrderEvent

219

case class BillingConfirmed(deliveryId: Long) extends OrderEvent

220

221

// Commands

222

case class ProcessOrder(orderId: String, items: List[String])

223

case class WarehouseAck(deliveryId: Long)

224

case class BillingAck(deliveryId: Long)

225

226

// Messages sent to other services

227

case class FulfillOrder(orderId: String, items: List[String], deliveryId: Long)

228

case class ProcessPayment(orderId: String, amount: BigDecimal, deliveryId: Long)

229

230

class OrderFulfillmentProcessor extends PersistentActor with AtLeastOnceDelivery {

231

override def persistenceId: String = "order-fulfillment"

232

233

// Service endpoints

234

val warehousePath = ActorPath.fromString("akka://system/user/warehouse")

235

val billingPath = ActorPath.fromString("akka://system/user/billing")

236

237

// Configure delivery behavior

238

override def redeliverInterval = 10.seconds

239

override def redeliveryBurstLimit = 100

240

override def warnAfterNumberOfUnconfirmedAttempts = 5

241

override def maxUnconfirmedMessages = 1000

242

243

var orders = Map.empty[String, OrderState]

244

245

override def receiveCommand: Receive = {

246

case ProcessOrder(orderId, items) =>

247

persist(OrderReceived(orderId, items)) { evt =>

248

orders += orderId -> OrderState(orderId, items, warehousePending = true, billingPending = true)

249

250

// Send to warehouse

251

deliver(warehousePath) { deliveryId =>

252

persist(OrderSentToWarehouse(orderId, warehousePath, deliveryId)) { _ =>

253

FulfillOrder(orderId, items, deliveryId)

254

}

255

}

256

257

// Send to billing

258

deliver(billingPath) { deliveryId =>

259

persist(OrderSentToBilling(orderId, billingPath, deliveryId)) { _ =>

260

ProcessPayment(orderId, calculateTotal(items), deliveryId)

261

}

262

}

263

}

264

265

case WarehouseAck(deliveryId) =>

266

persist(WarehouseConfirmed(deliveryId)) { evt =>

267

if (confirmDelivery(deliveryId)) {

268

updateOrderStatus(deliveryId, warehouseComplete = true)

269

}

270

}

271

272

case BillingAck(deliveryId) =>

273

persist(BillingConfirmed(deliveryId)) { evt =>

274

if (confirmDelivery(deliveryId)) {

275

updateOrderStatus(deliveryId, billingComplete = true)

276

}

277

}

278

279

case UnconfirmedWarning(unconfirmed) =>

280

println(s"${unconfirmed.size} unconfirmed deliveries:")

281

unconfirmed.groupBy(_.destination).foreach { case (dest, deliveries) =>

282

println(s" ${dest.name}: ${deliveries.size} pending")

283

}

284

285

case "status" =>

286

sender() ! DeliveryStatus(numberOfUnconfirmed, orders.size)

287

}

288

289

override def receiveRecover: Receive = {

290

case OrderReceived(orderId, items) =>

291

orders += orderId -> OrderState(orderId, items, warehousePending = true, billingPending = true)

292

293

case OrderSentToWarehouse(orderId, warehousePath, deliveryId) =>

294

deliver(warehousePath) { _ => FulfillOrder(orderId, orders(orderId).items, deliveryId) }

295

296

case OrderSentToBilling(orderId, billingPath, deliveryId) =>

297

deliver(billingPath) { _ =>

298

ProcessPayment(orderId, calculateTotal(orders(orderId).items), deliveryId)

299

}

300

301

case WarehouseConfirmed(deliveryId) =>

302

confirmDelivery(deliveryId)

303

304

case BillingConfirmed(deliveryId) =>

305

confirmDelivery(deliveryId)

306

}

307

308

// Helper methods

309

private def updateOrderStatus(deliveryId: Long, warehouseComplete: Boolean = false, billingComplete: Boolean = false): Unit = {

310

// Find order by scanning unconfirmed deliveries or maintaining lookup table

311

// Update order completion status

312

// Send completion notification if both warehouse and billing are done

313

}

314

315

private def calculateTotal(items: List[String]): BigDecimal = {

316

// Calculate order total

317

BigDecimal(items.size * 10) // Simplified

318

}

319

}

320

321

case class OrderState(

322

orderId: String,

323

items: List[String],

324

warehousePending: Boolean,

325

billingPending: Boolean

326

)

327

328

case class DeliveryStatus(unconfirmedCount: Int, activeOrders: Int)

329

```

330

331

### Snapshot Integration

332

333

```scala

334

class SnapshotAwareDeliveryActor extends PersistentActor with AtLeastOnceDelivery {

335

override def persistenceId: String = "snapshot-delivery"

336

337

var businessState = Map.empty[String, String]

338

339

override def receiveCommand: Receive = {

340

case "snapshot" =>

341

// Save both business state and delivery state

342

val snapshot = CombinedSnapshot(businessState, getDeliverySnapshot)

343

saveSnapshot(snapshot)

344

345

case SaveSnapshotSuccess(metadata) =>

346

println(s"Snapshot saved at sequence ${metadata.sequenceNr}")

347

348

case other => // Handle other messages

349

}

350

351

override def receiveRecover: Receive = {

352

case SnapshotOffer(_, snapshot: CombinedSnapshot) =>

353

businessState = snapshot.businessState

354

setDeliverySnapshot(snapshot.deliverySnapshot)

355

356

case other => // Handle other recovery events

357

}

358

}

359

360

case class CombinedSnapshot(

361

businessState: Map[String, String],

362

deliverySnapshot: AtLeastOnceDeliverySnapshot

363

)

364

```

365

366

### Configuration

367

368

At-least-once delivery settings can be configured in application.conf:

369

370

```hocon

371

akka.persistence.at-least-once-delivery {

372

# Interval between redelivery attempts

373

redeliver-interval = 5s

374

375

# Number of messages sent in one redelivery burst

376

redelivery-burst-limit = 10000

377

378

# After this number of delivery attempts a warning will be logged

379

warn-after-number-of-unconfirmed-attempts = 5

380

381

# Maximum number of unconfirmed messages

382

max-unconfirmed-messages = 100000

383

}

384

```