CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-persistence-2-13

Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.

Pending
Overview
Eval results
Files

at-least-once-delivery.mddocs/

At-Least-Once Delivery

Reliable message delivery with automatic redelivery, confirmation tracking, and configurable retry policies.

Capabilities

AtLeastOnceDelivery Trait

Scala API for at-least-once delivery semantics.

/**
 * Scala API for at-least-once delivery semantics  
 */
trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {
  
  /** Deliver message to actor path with delivery confirmation tracking */
  def deliver(destination: ActorPath)(deliveryIdToMessage: Long => Any): Unit
  
  /** Deliver message to actor selection with delivery confirmation tracking */
  def deliver(destination: ActorSelection)(deliveryIdToMessage: Long => Any): Unit
}

AtLeastOnceDeliveryLike Core Functionality

Core at-least-once delivery functionality and configuration.

/**
 * Core at-least-once delivery functionality
 */
trait AtLeastOnceDeliveryLike extends Eventsourced {
  
  /** Confirm successful delivery of message by delivery ID */
  def confirmDelivery(deliveryId: Long): Boolean
  
  /** Get count of unconfirmed deliveries */
  def numberOfUnconfirmed: Int
  
  /** Get snapshot of current delivery state */
  def getDeliverySnapshot: AtLeastOnceDeliverySnapshot
  
  /** Restore delivery state from snapshot */
  def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit
  
  // Configuration properties with defaults
  /** Interval between redelivery attempts */
  def redeliverInterval: FiniteDuration = 5.seconds
  
  /** Maximum number of messages to redeliver in single burst */
  def redeliveryBurstLimit: Int = 10000
  
  /** Warn after this many unconfirmed delivery attempts */  
  def warnAfterNumberOfUnconfirmedAttempts: Int = 5
  
  /** Maximum number of unconfirmed messages allowed */
  def maxUnconfirmedMessages: Int = 100000
}

AbstractPersistentActorWithAtLeastOnceDelivery (Java API)

Java API for at-least-once delivery.

/**
 * Java API for at-least-once delivery
 */
abstract class AbstractPersistentActorWithAtLeastOnceDelivery 
  extends AbstractPersistentActor with AtLeastOnceDeliveryLike {
  
  /** Deliver message to actor path (Java API) */
  def deliver(destination: ActorPath, deliveryIdToMessage: Function[java.lang.Long, Any]): Unit
  
  /** Deliver message to actor selection (Java API) */
  def deliver(destination: ActorSelection, deliveryIdToMessage: Function[java.lang.Long, Any]): Unit
}

Delivery State Management

AtLeastOnceDeliverySnapshot

Snapshot representation of delivery state for persistence.

/**
 * Snapshot of at-least-once delivery state
 */
case class AtLeastOnceDeliverySnapshot(
  currentDeliveryId: Long,
  unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]
) {
  /** Java API to get unconfirmed deliveries */
  def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = 
    unconfirmedDeliveries.asJava
}

UnconfirmedDelivery

Information about pending delivery attempts.

/**
 * Information about an unconfirmed delivery attempt
 */
case class UnconfirmedDelivery(
  deliveryId: Long,
  destination: ActorPath, 
  message: Any
) {
  /** Java API to get message */
  def getMessage(): AnyRef = message.asInstanceOf[AnyRef]
}

Delivery Warnings and Exceptions

UnconfirmedWarning

Warning message sent when deliveries remain unconfirmed.

/**
 * Warning about unconfirmed deliveries sent to self
 */
case class UnconfirmedWarning(
  unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]
) {
  /** Java API to get unconfirmed deliveries */
  def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = 
    unconfirmedDeliveries.asJava
}

MaxUnconfirmedMessagesExceededException

Exception thrown when maximum unconfirmed message limit is exceeded.

/**
 * Exception when max unconfirmed messages limit is exceeded
 */
class MaxUnconfirmedMessagesExceededException(message: String) 
  extends RuntimeException(message)

Example: Basic At-Least-Once Delivery

import akka.persistence._
import akka.actor.ActorPath
import scala.concurrent.duration._

// Messages
case class SendOrder(orderId: String, destination: ActorPath)
case class OrderMessage(orderId: String, deliveryId: Long)
case class OrderConfirmation(deliveryId: Long)

class OrderProcessor extends PersistentActor with AtLeastOnceDelivery {
  override def persistenceId: String = "order-processor"
  
  // Configure delivery settings
  override def redeliverInterval: FiniteDuration = 30.seconds
  override def maxUnconfirmedMessages: Int = 10000
  override def warnAfterNumberOfUnconfirmedAttempts: Int = 3
  
  override def receiveCommand: Receive = {
    case SendOrder(orderId, destination) =>
      persist(SendOrder(orderId, destination)) { evt =>
        // Deliver with automatic retry until confirmed
        deliver(destination) { deliveryId =>
          OrderMessage(orderId, deliveryId)
        }
      }
      
    case OrderConfirmation(deliveryId) =>
      persist(OrderConfirmation(deliveryId)) { evt =>
        val confirmed = confirmDelivery(deliveryId)
        if (confirmed) {
          println(s"Confirmed delivery $deliveryId")
        } else {
          println(s"Delivery $deliveryId was already confirmed or not found")
        }
      }
      
    case UnconfirmedWarning(unconfirmed) =>
      println(s"Warning: ${unconfirmed.size} unconfirmed deliveries")
      unconfirmed.foreach { delivery =>
        println(s"  Delivery ${delivery.deliveryId} to ${delivery.destination}")
      }
  }
  
  override def receiveRecover: Receive = {
    case SendOrder(orderId, destination) =>
      deliver(destination) { deliveryId =>
        OrderMessage(orderId, deliveryId) 
      }
      
    case OrderConfirmation(deliveryId) =>
      confirmDelivery(deliveryId)
  }
}

Example: Order Fulfillment System

import akka.persistence._
import akka.actor.{ActorRef, ActorPath}

// Events for persistence
sealed trait OrderEvent
case class OrderReceived(orderId: String, items: List[String]) extends OrderEvent
case class OrderSentToWarehouse(orderId: String, warehousePath: ActorPath, deliveryId: Long) extends OrderEvent
case class OrderSentToBilling(orderId: String, billingPath: ActorPath, deliveryId: Long) extends OrderEvent  
case class WarehouseConfirmed(deliveryId: Long) extends OrderEvent
case class BillingConfirmed(deliveryId: Long) extends OrderEvent

// Commands
case class ProcessOrder(orderId: String, items: List[String])
case class WarehouseAck(deliveryId: Long)
case class BillingAck(deliveryId: Long)

// Messages sent to other services
case class FulfillOrder(orderId: String, items: List[String], deliveryId: Long)
case class ProcessPayment(orderId: String, amount: BigDecimal, deliveryId: Long)

class OrderFulfillmentProcessor extends PersistentActor with AtLeastOnceDelivery {
  override def persistenceId: String = "order-fulfillment"
  
  // Service endpoints
  val warehousePath = ActorPath.fromString("akka://system/user/warehouse")
  val billingPath = ActorPath.fromString("akka://system/user/billing")
  
  // Configure delivery behavior
  override def redeliverInterval = 10.seconds
  override def redeliveryBurstLimit = 100
  override def warnAfterNumberOfUnconfirmedAttempts = 5
  override def maxUnconfirmedMessages = 1000
  
  var orders = Map.empty[String, OrderState]
  
  override def receiveCommand: Receive = {
    case ProcessOrder(orderId, items) =>
      persist(OrderReceived(orderId, items)) { evt =>
        orders += orderId -> OrderState(orderId, items, warehousePending = true, billingPending = true)
        
        // Send to warehouse
        deliver(warehousePath) { deliveryId =>
          persist(OrderSentToWarehouse(orderId, warehousePath, deliveryId)) { _ =>
            FulfillOrder(orderId, items, deliveryId)
          }
        }
        
        // Send to billing  
        deliver(billingPath) { deliveryId =>
          persist(OrderSentToBilling(orderId, billingPath, deliveryId)) { _ =>
            ProcessPayment(orderId, calculateTotal(items), deliveryId)
          }
        }
      }
      
    case WarehouseAck(deliveryId) =>
      persist(WarehouseConfirmed(deliveryId)) { evt =>
        if (confirmDelivery(deliveryId)) {
          updateOrderStatus(deliveryId, warehouseComplete = true)
        }
      }
      
    case BillingAck(deliveryId) =>
      persist(BillingConfirmed(deliveryId)) { evt =>
        if (confirmDelivery(deliveryId)) {
          updateOrderStatus(deliveryId, billingComplete = true)
        }
      }
      
    case UnconfirmedWarning(unconfirmed) =>
      println(s"${unconfirmed.size} unconfirmed deliveries:")
      unconfirmed.groupBy(_.destination).foreach { case (dest, deliveries) =>
        println(s"  ${dest.name}: ${deliveries.size} pending")
      }
      
    case "status" =>
      sender() ! DeliveryStatus(numberOfUnconfirmed, orders.size)
  }
  
  override def receiveRecover: Receive = {
    case OrderReceived(orderId, items) =>
      orders += orderId -> OrderState(orderId, items, warehousePending = true, billingPending = true)
      
    case OrderSentToWarehouse(orderId, warehousePath, deliveryId) =>
      deliver(warehousePath) { _ => FulfillOrder(orderId, orders(orderId).items, deliveryId) }
      
    case OrderSentToBilling(orderId, billingPath, deliveryId) =>
      deliver(billingPath) { _ => 
        ProcessPayment(orderId, calculateTotal(orders(orderId).items), deliveryId)
      }
      
    case WarehouseConfirmed(deliveryId) =>
      confirmDelivery(deliveryId)
      
    case BillingConfirmed(deliveryId) =>
      confirmDelivery(deliveryId)
  }
  
  // Helper methods
  private def updateOrderStatus(deliveryId: Long, warehouseComplete: Boolean = false, billingComplete: Boolean = false): Unit = {
    // Find order by scanning unconfirmed deliveries or maintaining lookup table
    // Update order completion status
    // Send completion notification if both warehouse and billing are done
  }
  
  private def calculateTotal(items: List[String]): BigDecimal = {
    // Calculate order total
    BigDecimal(items.size * 10) // Simplified
  }
}

case class OrderState(
  orderId: String, 
  items: List[String], 
  warehousePending: Boolean, 
  billingPending: Boolean
)

case class DeliveryStatus(unconfirmedCount: Int, activeOrders: Int)

Snapshot Integration

class SnapshotAwareDeliveryActor extends PersistentActor with AtLeastOnceDelivery {
  override def persistenceId: String = "snapshot-delivery"
  
  var businessState = Map.empty[String, String]
  
  override def receiveCommand: Receive = {
    case "snapshot" =>
      // Save both business state and delivery state
      val snapshot = CombinedSnapshot(businessState, getDeliverySnapshot)
      saveSnapshot(snapshot)
      
    case SaveSnapshotSuccess(metadata) =>
      println(s"Snapshot saved at sequence ${metadata.sequenceNr}")
      
    case other => // Handle other messages
  }
  
  override def receiveRecover: Receive = {
    case SnapshotOffer(_, snapshot: CombinedSnapshot) =>
      businessState = snapshot.businessState
      setDeliverySnapshot(snapshot.deliverySnapshot)
      
    case other => // Handle other recovery events
  }
}

case class CombinedSnapshot(
  businessState: Map[String, String],
  deliverySnapshot: AtLeastOnceDeliverySnapshot
)

Configuration

At-least-once delivery settings can be configured in application.conf:

akka.persistence.at-least-once-delivery {
  # Interval between redelivery attempts
  redeliver-interval = 5s
  
  # Number of messages sent in one redelivery burst  
  redelivery-burst-limit = 10000
  
  # After this number of delivery attempts a warning will be logged
  warn-after-number-of-unconfirmed-attempts = 5
  
  # Maximum number of unconfirmed messages
  max-unconfirmed-messages = 100000
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13

docs

at-least-once-delivery.md

durable-state.md

event-adapters.md

index.md

journal-api.md

persistent-actors.md

plugin-development.md

snapshots.md

tile.json