Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.
—
Reliable message delivery with automatic redelivery, confirmation tracking, and configurable retry policies.
Scala API for at-least-once delivery semantics.
/**
* Scala API for at-least-once delivery semantics
*/
trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {
/** Deliver message to actor path with delivery confirmation tracking */
def deliver(destination: ActorPath)(deliveryIdToMessage: Long => Any): Unit
/** Deliver message to actor selection with delivery confirmation tracking */
def deliver(destination: ActorSelection)(deliveryIdToMessage: Long => Any): Unit
}Core at-least-once delivery functionality and configuration.
/**
* Core at-least-once delivery functionality
*/
trait AtLeastOnceDeliveryLike extends Eventsourced {
/** Confirm successful delivery of message by delivery ID */
def confirmDelivery(deliveryId: Long): Boolean
/** Get count of unconfirmed deliveries */
def numberOfUnconfirmed: Int
/** Get snapshot of current delivery state */
def getDeliverySnapshot: AtLeastOnceDeliverySnapshot
/** Restore delivery state from snapshot */
def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit
// Configuration properties with defaults
/** Interval between redelivery attempts */
def redeliverInterval: FiniteDuration = 5.seconds
/** Maximum number of messages to redeliver in single burst */
def redeliveryBurstLimit: Int = 10000
/** Warn after this many unconfirmed delivery attempts */
def warnAfterNumberOfUnconfirmedAttempts: Int = 5
/** Maximum number of unconfirmed messages allowed */
def maxUnconfirmedMessages: Int = 100000
}Java API for at-least-once delivery.
/**
* Java API for at-least-once delivery
*/
abstract class AbstractPersistentActorWithAtLeastOnceDelivery
extends AbstractPersistentActor with AtLeastOnceDeliveryLike {
/** Deliver message to actor path (Java API) */
def deliver(destination: ActorPath, deliveryIdToMessage: Function[java.lang.Long, Any]): Unit
/** Deliver message to actor selection (Java API) */
def deliver(destination: ActorSelection, deliveryIdToMessage: Function[java.lang.Long, Any]): Unit
}Snapshot representation of delivery state for persistence.
/**
* Snapshot of at-least-once delivery state
*/
case class AtLeastOnceDeliverySnapshot(
currentDeliveryId: Long,
unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]
) {
/** Java API to get unconfirmed deliveries */
def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] =
unconfirmedDeliveries.asJava
}Information about pending delivery attempts.
/**
* Information about an unconfirmed delivery attempt
*/
case class UnconfirmedDelivery(
deliveryId: Long,
destination: ActorPath,
message: Any
) {
/** Java API to get message */
def getMessage(): AnyRef = message.asInstanceOf[AnyRef]
}Warning message sent when deliveries remain unconfirmed.
/**
* Warning about unconfirmed deliveries sent to self
*/
case class UnconfirmedWarning(
unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]
) {
/** Java API to get unconfirmed deliveries */
def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] =
unconfirmedDeliveries.asJava
}Exception thrown when maximum unconfirmed message limit is exceeded.
/**
* Exception when max unconfirmed messages limit is exceeded
*/
class MaxUnconfirmedMessagesExceededException(message: String)
extends RuntimeException(message)import akka.persistence._
import akka.actor.ActorPath
import scala.concurrent.duration._
// Messages
case class SendOrder(orderId: String, destination: ActorPath)
case class OrderMessage(orderId: String, deliveryId: Long)
case class OrderConfirmation(deliveryId: Long)
class OrderProcessor extends PersistentActor with AtLeastOnceDelivery {
override def persistenceId: String = "order-processor"
// Configure delivery settings
override def redeliverInterval: FiniteDuration = 30.seconds
override def maxUnconfirmedMessages: Int = 10000
override def warnAfterNumberOfUnconfirmedAttempts: Int = 3
override def receiveCommand: Receive = {
case SendOrder(orderId, destination) =>
persist(SendOrder(orderId, destination)) { evt =>
// Deliver with automatic retry until confirmed
deliver(destination) { deliveryId =>
OrderMessage(orderId, deliveryId)
}
}
case OrderConfirmation(deliveryId) =>
persist(OrderConfirmation(deliveryId)) { evt =>
val confirmed = confirmDelivery(deliveryId)
if (confirmed) {
println(s"Confirmed delivery $deliveryId")
} else {
println(s"Delivery $deliveryId was already confirmed or not found")
}
}
case UnconfirmedWarning(unconfirmed) =>
println(s"Warning: ${unconfirmed.size} unconfirmed deliveries")
unconfirmed.foreach { delivery =>
println(s" Delivery ${delivery.deliveryId} to ${delivery.destination}")
}
}
override def receiveRecover: Receive = {
case SendOrder(orderId, destination) =>
deliver(destination) { deliveryId =>
OrderMessage(orderId, deliveryId)
}
case OrderConfirmation(deliveryId) =>
confirmDelivery(deliveryId)
}
}import akka.persistence._
import akka.actor.{ActorRef, ActorPath}
// Events for persistence
sealed trait OrderEvent
case class OrderReceived(orderId: String, items: List[String]) extends OrderEvent
case class OrderSentToWarehouse(orderId: String, warehousePath: ActorPath, deliveryId: Long) extends OrderEvent
case class OrderSentToBilling(orderId: String, billingPath: ActorPath, deliveryId: Long) extends OrderEvent
case class WarehouseConfirmed(deliveryId: Long) extends OrderEvent
case class BillingConfirmed(deliveryId: Long) extends OrderEvent
// Commands
case class ProcessOrder(orderId: String, items: List[String])
case class WarehouseAck(deliveryId: Long)
case class BillingAck(deliveryId: Long)
// Messages sent to other services
case class FulfillOrder(orderId: String, items: List[String], deliveryId: Long)
case class ProcessPayment(orderId: String, amount: BigDecimal, deliveryId: Long)
class OrderFulfillmentProcessor extends PersistentActor with AtLeastOnceDelivery {
override def persistenceId: String = "order-fulfillment"
// Service endpoints
val warehousePath = ActorPath.fromString("akka://system/user/warehouse")
val billingPath = ActorPath.fromString("akka://system/user/billing")
// Configure delivery behavior
override def redeliverInterval = 10.seconds
override def redeliveryBurstLimit = 100
override def warnAfterNumberOfUnconfirmedAttempts = 5
override def maxUnconfirmedMessages = 1000
var orders = Map.empty[String, OrderState]
override def receiveCommand: Receive = {
case ProcessOrder(orderId, items) =>
persist(OrderReceived(orderId, items)) { evt =>
orders += orderId -> OrderState(orderId, items, warehousePending = true, billingPending = true)
// Send to warehouse
deliver(warehousePath) { deliveryId =>
persist(OrderSentToWarehouse(orderId, warehousePath, deliveryId)) { _ =>
FulfillOrder(orderId, items, deliveryId)
}
}
// Send to billing
deliver(billingPath) { deliveryId =>
persist(OrderSentToBilling(orderId, billingPath, deliveryId)) { _ =>
ProcessPayment(orderId, calculateTotal(items), deliveryId)
}
}
}
case WarehouseAck(deliveryId) =>
persist(WarehouseConfirmed(deliveryId)) { evt =>
if (confirmDelivery(deliveryId)) {
updateOrderStatus(deliveryId, warehouseComplete = true)
}
}
case BillingAck(deliveryId) =>
persist(BillingConfirmed(deliveryId)) { evt =>
if (confirmDelivery(deliveryId)) {
updateOrderStatus(deliveryId, billingComplete = true)
}
}
case UnconfirmedWarning(unconfirmed) =>
println(s"${unconfirmed.size} unconfirmed deliveries:")
unconfirmed.groupBy(_.destination).foreach { case (dest, deliveries) =>
println(s" ${dest.name}: ${deliveries.size} pending")
}
case "status" =>
sender() ! DeliveryStatus(numberOfUnconfirmed, orders.size)
}
override def receiveRecover: Receive = {
case OrderReceived(orderId, items) =>
orders += orderId -> OrderState(orderId, items, warehousePending = true, billingPending = true)
case OrderSentToWarehouse(orderId, warehousePath, deliveryId) =>
deliver(warehousePath) { _ => FulfillOrder(orderId, orders(orderId).items, deliveryId) }
case OrderSentToBilling(orderId, billingPath, deliveryId) =>
deliver(billingPath) { _ =>
ProcessPayment(orderId, calculateTotal(orders(orderId).items), deliveryId)
}
case WarehouseConfirmed(deliveryId) =>
confirmDelivery(deliveryId)
case BillingConfirmed(deliveryId) =>
confirmDelivery(deliveryId)
}
// Helper methods
private def updateOrderStatus(deliveryId: Long, warehouseComplete: Boolean = false, billingComplete: Boolean = false): Unit = {
// Find order by scanning unconfirmed deliveries or maintaining lookup table
// Update order completion status
// Send completion notification if both warehouse and billing are done
}
private def calculateTotal(items: List[String]): BigDecimal = {
// Calculate order total
BigDecimal(items.size * 10) // Simplified
}
}
case class OrderState(
orderId: String,
items: List[String],
warehousePending: Boolean,
billingPending: Boolean
)
case class DeliveryStatus(unconfirmedCount: Int, activeOrders: Int)class SnapshotAwareDeliveryActor extends PersistentActor with AtLeastOnceDelivery {
override def persistenceId: String = "snapshot-delivery"
var businessState = Map.empty[String, String]
override def receiveCommand: Receive = {
case "snapshot" =>
// Save both business state and delivery state
val snapshot = CombinedSnapshot(businessState, getDeliverySnapshot)
saveSnapshot(snapshot)
case SaveSnapshotSuccess(metadata) =>
println(s"Snapshot saved at sequence ${metadata.sequenceNr}")
case other => // Handle other messages
}
override def receiveRecover: Receive = {
case SnapshotOffer(_, snapshot: CombinedSnapshot) =>
businessState = snapshot.businessState
setDeliverySnapshot(snapshot.deliverySnapshot)
case other => // Handle other recovery events
}
}
case class CombinedSnapshot(
businessState: Map[String, String],
deliverySnapshot: AtLeastOnceDeliverySnapshot
)At-least-once delivery settings can be configured in application.conf:
akka.persistence.at-least-once-delivery {
# Interval between redelivery attempts
redeliver-interval = 5s
# Number of messages sent in one redelivery burst
redelivery-burst-limit = 10000
# After this number of delivery attempts a warning will be logged
warn-after-number-of-unconfirmed-attempts = 5
# Maximum number of unconfirmed messages
max-unconfirmed-messages = 100000
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13