0
# At-Least-Once Delivery
1
2
Reliable message delivery with automatic redelivery, confirmation tracking, and configurable retry policies.
3
4
## Capabilities
5
6
### AtLeastOnceDelivery Trait
7
8
Scala API for at-least-once delivery semantics.
9
10
```scala { .api }
11
/**
12
* Scala API for at-least-once delivery semantics
13
*/
14
trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {
15
16
/** Deliver message to actor path with delivery confirmation tracking */
17
def deliver(destination: ActorPath)(deliveryIdToMessage: Long => Any): Unit
18
19
/** Deliver message to actor selection with delivery confirmation tracking */
20
def deliver(destination: ActorSelection)(deliveryIdToMessage: Long => Any): Unit
21
}
22
```
23
24
### AtLeastOnceDeliveryLike Core Functionality
25
26
Core at-least-once delivery functionality and configuration.
27
28
```scala { .api }
29
/**
30
* Core at-least-once delivery functionality
31
*/
32
trait AtLeastOnceDeliveryLike extends Eventsourced {
33
34
/** Confirm successful delivery of message by delivery ID */
35
def confirmDelivery(deliveryId: Long): Boolean
36
37
/** Get count of unconfirmed deliveries */
38
def numberOfUnconfirmed: Int
39
40
/** Get snapshot of current delivery state */
41
def getDeliverySnapshot: AtLeastOnceDeliverySnapshot
42
43
/** Restore delivery state from snapshot */
44
def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit
45
46
// Configuration properties with defaults
47
/** Interval between redelivery attempts */
48
def redeliverInterval: FiniteDuration = 5.seconds
49
50
/** Maximum number of messages to redeliver in single burst */
51
def redeliveryBurstLimit: Int = 10000
52
53
/** Warn after this many unconfirmed delivery attempts */
54
def warnAfterNumberOfUnconfirmedAttempts: Int = 5
55
56
/** Maximum number of unconfirmed messages allowed */
57
def maxUnconfirmedMessages: Int = 100000
58
}
59
```
60
61
### AbstractPersistentActorWithAtLeastOnceDelivery (Java API)
62
63
Java API for at-least-once delivery.
64
65
```scala { .api }
66
/**
67
* Java API for at-least-once delivery
68
*/
69
abstract class AbstractPersistentActorWithAtLeastOnceDelivery
70
extends AbstractPersistentActor with AtLeastOnceDeliveryLike {
71
72
/** Deliver message to actor path (Java API) */
73
def deliver(destination: ActorPath, deliveryIdToMessage: Function[java.lang.Long, Any]): Unit
74
75
/** Deliver message to actor selection (Java API) */
76
def deliver(destination: ActorSelection, deliveryIdToMessage: Function[java.lang.Long, Any]): Unit
77
}
78
```
79
80
### Delivery State Management
81
82
#### AtLeastOnceDeliverySnapshot
83
84
Snapshot representation of delivery state for persistence.
85
86
```scala { .api }
87
/**
88
* Snapshot of at-least-once delivery state
89
*/
90
case class AtLeastOnceDeliverySnapshot(
91
currentDeliveryId: Long,
92
unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]
93
) {
94
/** Java API to get unconfirmed deliveries */
95
def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] =
96
unconfirmedDeliveries.asJava
97
}
98
```
99
100
#### UnconfirmedDelivery
101
102
Information about pending delivery attempts.
103
104
```scala { .api }
105
/**
106
* Information about an unconfirmed delivery attempt
107
*/
108
case class UnconfirmedDelivery(
109
deliveryId: Long,
110
destination: ActorPath,
111
message: Any
112
) {
113
/** Java API to get message */
114
def getMessage(): AnyRef = message.asInstanceOf[AnyRef]
115
}
116
```
117
118
### Delivery Warnings and Exceptions
119
120
#### UnconfirmedWarning
121
122
Warning message sent when deliveries remain unconfirmed.
123
124
```scala { .api }
125
/**
126
* Warning about unconfirmed deliveries sent to self
127
*/
128
case class UnconfirmedWarning(
129
unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]
130
) {
131
/** Java API to get unconfirmed deliveries */
132
def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] =
133
unconfirmedDeliveries.asJava
134
}
135
```
136
137
#### MaxUnconfirmedMessagesExceededException
138
139
Exception thrown when maximum unconfirmed message limit is exceeded.
140
141
```scala { .api }
142
/**
143
* Exception when max unconfirmed messages limit is exceeded
144
*/
145
class MaxUnconfirmedMessagesExceededException(message: String)
146
extends RuntimeException(message)
147
```
148
149
### Example: Basic At-Least-Once Delivery
150
151
```scala
152
import akka.persistence._
153
import akka.actor.ActorPath
154
import scala.concurrent.duration._
155
156
// Messages
157
case class SendOrder(orderId: String, destination: ActorPath)
158
case class OrderMessage(orderId: String, deliveryId: Long)
159
case class OrderConfirmation(deliveryId: Long)
160
161
class OrderProcessor extends PersistentActor with AtLeastOnceDelivery {
162
override def persistenceId: String = "order-processor"
163
164
// Configure delivery settings
165
override def redeliverInterval: FiniteDuration = 30.seconds
166
override def maxUnconfirmedMessages: Int = 10000
167
override def warnAfterNumberOfUnconfirmedAttempts: Int = 3
168
169
override def receiveCommand: Receive = {
170
case SendOrder(orderId, destination) =>
171
persist(SendOrder(orderId, destination)) { evt =>
172
// Deliver with automatic retry until confirmed
173
deliver(destination) { deliveryId =>
174
OrderMessage(orderId, deliveryId)
175
}
176
}
177
178
case OrderConfirmation(deliveryId) =>
179
persist(OrderConfirmation(deliveryId)) { evt =>
180
val confirmed = confirmDelivery(deliveryId)
181
if (confirmed) {
182
println(s"Confirmed delivery $deliveryId")
183
} else {
184
println(s"Delivery $deliveryId was already confirmed or not found")
185
}
186
}
187
188
case UnconfirmedWarning(unconfirmed) =>
189
println(s"Warning: ${unconfirmed.size} unconfirmed deliveries")
190
unconfirmed.foreach { delivery =>
191
println(s" Delivery ${delivery.deliveryId} to ${delivery.destination}")
192
}
193
}
194
195
override def receiveRecover: Receive = {
196
case SendOrder(orderId, destination) =>
197
deliver(destination) { deliveryId =>
198
OrderMessage(orderId, deliveryId)
199
}
200
201
case OrderConfirmation(deliveryId) =>
202
confirmDelivery(deliveryId)
203
}
204
}
205
```
206
207
### Example: Order Fulfillment System
208
209
```scala
210
import akka.persistence._
211
import akka.actor.{ActorRef, ActorPath}
212
213
// Events for persistence
214
sealed trait OrderEvent
215
case class OrderReceived(orderId: String, items: List[String]) extends OrderEvent
216
case class OrderSentToWarehouse(orderId: String, warehousePath: ActorPath, deliveryId: Long) extends OrderEvent
217
case class OrderSentToBilling(orderId: String, billingPath: ActorPath, deliveryId: Long) extends OrderEvent
218
case class WarehouseConfirmed(deliveryId: Long) extends OrderEvent
219
case class BillingConfirmed(deliveryId: Long) extends OrderEvent
220
221
// Commands
222
case class ProcessOrder(orderId: String, items: List[String])
223
case class WarehouseAck(deliveryId: Long)
224
case class BillingAck(deliveryId: Long)
225
226
// Messages sent to other services
227
case class FulfillOrder(orderId: String, items: List[String], deliveryId: Long)
228
case class ProcessPayment(orderId: String, amount: BigDecimal, deliveryId: Long)
229
230
class OrderFulfillmentProcessor extends PersistentActor with AtLeastOnceDelivery {
231
override def persistenceId: String = "order-fulfillment"
232
233
// Service endpoints
234
val warehousePath = ActorPath.fromString("akka://system/user/warehouse")
235
val billingPath = ActorPath.fromString("akka://system/user/billing")
236
237
// Configure delivery behavior
238
override def redeliverInterval = 10.seconds
239
override def redeliveryBurstLimit = 100
240
override def warnAfterNumberOfUnconfirmedAttempts = 5
241
override def maxUnconfirmedMessages = 1000
242
243
var orders = Map.empty[String, OrderState]
244
245
override def receiveCommand: Receive = {
246
case ProcessOrder(orderId, items) =>
247
persist(OrderReceived(orderId, items)) { evt =>
248
orders += orderId -> OrderState(orderId, items, warehousePending = true, billingPending = true)
249
250
// Send to warehouse
251
deliver(warehousePath) { deliveryId =>
252
persist(OrderSentToWarehouse(orderId, warehousePath, deliveryId)) { _ =>
253
FulfillOrder(orderId, items, deliveryId)
254
}
255
}
256
257
// Send to billing
258
deliver(billingPath) { deliveryId =>
259
persist(OrderSentToBilling(orderId, billingPath, deliveryId)) { _ =>
260
ProcessPayment(orderId, calculateTotal(items), deliveryId)
261
}
262
}
263
}
264
265
case WarehouseAck(deliveryId) =>
266
persist(WarehouseConfirmed(deliveryId)) { evt =>
267
if (confirmDelivery(deliveryId)) {
268
updateOrderStatus(deliveryId, warehouseComplete = true)
269
}
270
}
271
272
case BillingAck(deliveryId) =>
273
persist(BillingConfirmed(deliveryId)) { evt =>
274
if (confirmDelivery(deliveryId)) {
275
updateOrderStatus(deliveryId, billingComplete = true)
276
}
277
}
278
279
case UnconfirmedWarning(unconfirmed) =>
280
println(s"${unconfirmed.size} unconfirmed deliveries:")
281
unconfirmed.groupBy(_.destination).foreach { case (dest, deliveries) =>
282
println(s" ${dest.name}: ${deliveries.size} pending")
283
}
284
285
case "status" =>
286
sender() ! DeliveryStatus(numberOfUnconfirmed, orders.size)
287
}
288
289
override def receiveRecover: Receive = {
290
case OrderReceived(orderId, items) =>
291
orders += orderId -> OrderState(orderId, items, warehousePending = true, billingPending = true)
292
293
case OrderSentToWarehouse(orderId, warehousePath, deliveryId) =>
294
deliver(warehousePath) { _ => FulfillOrder(orderId, orders(orderId).items, deliveryId) }
295
296
case OrderSentToBilling(orderId, billingPath, deliveryId) =>
297
deliver(billingPath) { _ =>
298
ProcessPayment(orderId, calculateTotal(orders(orderId).items), deliveryId)
299
}
300
301
case WarehouseConfirmed(deliveryId) =>
302
confirmDelivery(deliveryId)
303
304
case BillingConfirmed(deliveryId) =>
305
confirmDelivery(deliveryId)
306
}
307
308
// Helper methods
309
private def updateOrderStatus(deliveryId: Long, warehouseComplete: Boolean = false, billingComplete: Boolean = false): Unit = {
310
// Find order by scanning unconfirmed deliveries or maintaining lookup table
311
// Update order completion status
312
// Send completion notification if both warehouse and billing are done
313
}
314
315
private def calculateTotal(items: List[String]): BigDecimal = {
316
// Calculate order total
317
BigDecimal(items.size * 10) // Simplified
318
}
319
}
320
321
case class OrderState(
322
orderId: String,
323
items: List[String],
324
warehousePending: Boolean,
325
billingPending: Boolean
326
)
327
328
case class DeliveryStatus(unconfirmedCount: Int, activeOrders: Int)
329
```
330
331
### Snapshot Integration
332
333
```scala
334
class SnapshotAwareDeliveryActor extends PersistentActor with AtLeastOnceDelivery {
335
override def persistenceId: String = "snapshot-delivery"
336
337
var businessState = Map.empty[String, String]
338
339
override def receiveCommand: Receive = {
340
case "snapshot" =>
341
// Save both business state and delivery state
342
val snapshot = CombinedSnapshot(businessState, getDeliverySnapshot)
343
saveSnapshot(snapshot)
344
345
case SaveSnapshotSuccess(metadata) =>
346
println(s"Snapshot saved at sequence ${metadata.sequenceNr}")
347
348
case other => // Handle other messages
349
}
350
351
override def receiveRecover: Receive = {
352
case SnapshotOffer(_, snapshot: CombinedSnapshot) =>
353
businessState = snapshot.businessState
354
setDeliverySnapshot(snapshot.deliverySnapshot)
355
356
case other => // Handle other recovery events
357
}
358
}
359
360
case class CombinedSnapshot(
361
businessState: Map[String, String],
362
deliverySnapshot: AtLeastOnceDeliverySnapshot
363
)
364
```
365
366
### Configuration
367
368
At-least-once delivery settings can be configured in application.conf:
369
370
```hocon
371
akka.persistence.at-least-once-delivery {
372
# Interval between redelivery attempts
373
redeliver-interval = 5s
374
375
# Number of messages sent in one redelivery burst
376
redelivery-burst-limit = 10000
377
378
# After this number of delivery attempts a warning will be logged
379
warn-after-number-of-unconfirmed-attempts = 5
380
381
# Maximum number of unconfirmed messages
382
max-unconfirmed-messages = 100000
383
}
384
```